| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nutch.indexwriter.elastic; |
| |
| import java.lang.invoke.MethodHandles; |
| import java.time.format.DateTimeFormatter; |
| import java.io.IOException; |
| import java.util.AbstractMap; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.net.ssl.SSLContext; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.http.HttpHost; |
| import org.apache.http.auth.AuthScope; |
| import org.apache.http.auth.UsernamePasswordCredentials; |
| import org.apache.http.client.CredentialsProvider; |
| import org.apache.http.conn.ssl.NoopHostnameVerifier; |
| import org.apache.http.conn.ssl.TrustSelfSignedStrategy; |
| import org.apache.http.impl.client.BasicCredentialsProvider; |
| import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; |
| import org.apache.http.ssl.SSLContextBuilder; |
| import org.apache.http.ssl.SSLContexts; |
| import org.apache.nutch.indexer.IndexWriter; |
| import org.apache.nutch.indexer.IndexWriterParams; |
| import org.apache.nutch.indexer.NutchDocument; |
| import org.apache.nutch.indexer.NutchField; |
| import org.apache.nutch.util.StringUtil; |
| import org.elasticsearch.action.bulk.BulkResponse; |
| import org.elasticsearch.action.DocWriteRequest; |
| import org.elasticsearch.action.bulk.BackoffPolicy; |
| import org.elasticsearch.action.bulk.BulkProcessor; |
| import org.elasticsearch.action.bulk.BulkRequest; |
| import org.elasticsearch.action.delete.DeleteRequest; |
| import org.elasticsearch.action.index.IndexRequest; |
| import org.elasticsearch.client.RestClient; |
| import org.elasticsearch.client.RestClientBuilder; |
| import org.elasticsearch.client.RestHighLevelClient; |
| import org.elasticsearch.common.unit.ByteSizeUnit; |
| import org.elasticsearch.common.unit.ByteSizeValue; |
| import org.elasticsearch.common.unit.TimeValue; |
| import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; |
| import org.elasticsearch.common.xcontent.XContentBuilder; |
| import org.elasticsearch.common.xcontent.XContentFactory; |
| import org.elasticsearch.client.RequestOptions; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Sends NutchDocuments to a configured Elasticsearch index. |
| */ |
| public class ElasticIndexWriter implements IndexWriter { |
| private static final Logger LOG = LoggerFactory |
| .getLogger(MethodHandles.lookup().lookupClass()); |
| |
| private static final int DEFAULT_PORT = 9300; |
| private static final int DEFAULT_MAX_BULK_DOCS = 250; |
| private static final int DEFAULT_MAX_BULK_LENGTH = 2500500; |
| private static final int DEFAULT_EXP_BACKOFF_MILLIS = 100; |
| private static final int DEFAULT_EXP_BACKOFF_RETRIES = 10; |
| private static final int DEFAULT_BULK_CLOSE_TIMEOUT = 600; |
| private static final String DEFAULT_INDEX = "nutch"; |
| private static final String DEFAULT_USER = "elastic"; |
| |
| private String[] hosts; |
| private int port; |
| private String scheme = HttpHost.DEFAULT_SCHEME_NAME; |
| private String user = null; |
| private String password = null; |
| private boolean auth; |
| |
| private int maxBulkDocs; |
| private int maxBulkLength; |
| private int expBackoffMillis; |
| private int expBackoffRetries; |
| |
| private String defaultIndex; |
| private RestHighLevelClient client; |
| private BulkProcessor bulkProcessor; |
| |
| private long bulkCloseTimeout; |
| |
| private Configuration config; |
| |
| @Override |
| public void open(Configuration conf, String name) throws IOException { |
| // Implementation not required |
| } |
| |
| /** |
| * Initializes the internal variables from a given index writer configuration. |
| * |
| * @param parameters |
| * Params from the index writer configuration. |
| * @throws IOException |
| * Some exception thrown by writer. |
| */ |
| @Override |
| public void open(IndexWriterParams parameters) throws IOException { |
| |
| String hosts = parameters.get(ElasticConstants.HOSTS); |
| |
| if (StringUtils.isBlank(hosts)) { |
| String message = "Missing elastic.host this should be set in index-writers.xml "; |
| message += "\n" + describe(); |
| LOG.error(message); |
| throw new RuntimeException(message); |
| } |
| |
| bulkCloseTimeout = parameters.getLong(ElasticConstants.BULK_CLOSE_TIMEOUT, |
| DEFAULT_BULK_CLOSE_TIMEOUT); |
| defaultIndex = parameters.get(ElasticConstants.INDEX, DEFAULT_INDEX); |
| |
| maxBulkDocs = parameters.getInt(ElasticConstants.MAX_BULK_DOCS, |
| DEFAULT_MAX_BULK_DOCS); |
| maxBulkLength = parameters.getInt(ElasticConstants.MAX_BULK_LENGTH, |
| DEFAULT_MAX_BULK_LENGTH); |
| expBackoffMillis = parameters.getInt( |
| ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS, |
| DEFAULT_EXP_BACKOFF_MILLIS); |
| expBackoffRetries = parameters.getInt( |
| ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES, |
| DEFAULT_EXP_BACKOFF_RETRIES); |
| |
| client = makeClient(parameters); |
| |
| LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", |
| maxBulkDocs, maxBulkLength); |
| bulkProcessor = BulkProcessor |
| .builder( |
| (request, bulkListener) -> client.bulkAsync(request, |
| RequestOptions.DEFAULT, bulkListener), |
| bulkProcessorListener(), "nutch-indexer-elastic") |
| .setBulkActions(maxBulkDocs) |
| .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES)) |
| .setConcurrentRequests(1) |
| .setBackoffPolicy(BackoffPolicy.exponentialBackoff( |
| TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries)) |
| .build(); |
| } |
| |
| /** |
| * Generates a RestHighLevelClient with the hosts given |
| * @param parameters implementation specific {@link org.apache.nutch.indexer.IndexWriterParams} |
| * @return an initialized {@link org.elasticsearch.client.RestHighLevelClient} |
| * @throws IOException if there is an error reading the |
| * {@link org.apache.nutch.indexer.IndexWriterParams} |
| */ |
| protected RestHighLevelClient makeClient(IndexWriterParams parameters) |
| throws IOException { |
| hosts = parameters.getStrings(ElasticConstants.HOSTS); |
| port = parameters.getInt(ElasticConstants.PORT, DEFAULT_PORT); |
| scheme = parameters.get(ElasticConstants.SCHEME, HttpHost.DEFAULT_SCHEME_NAME); |
| auth = parameters.getBoolean(ElasticConstants.USE_AUTH, false); |
| user = parameters.get(ElasticConstants.USER, DEFAULT_USER); |
| password = parameters.get(ElasticConstants.PASSWORD, ""); |
| |
| final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
| credentialsProvider.setCredentials(AuthScope.ANY, |
| new UsernamePasswordCredentials(user, password)); |
| |
| RestHighLevelClient client = null; |
| |
| if (hosts != null && port > 1) { |
| HttpHost[] hostsList = new HttpHost[hosts.length]; |
| int i = 0; |
| for (String host : hosts) { |
| hostsList[i++] = new HttpHost(host, port, scheme); |
| } |
| RestClientBuilder restClientBuilder = RestClient.builder(hostsList); |
| |
| if (auth) { |
| restClientBuilder |
| .setHttpClientConfigCallback(new HttpClientConfigCallback() { |
| @Override |
| public HttpAsyncClientBuilder customizeHttpClient( |
| HttpAsyncClientBuilder arg0) { |
| return arg0.setDefaultCredentialsProvider(credentialsProvider); |
| } |
| }); |
| } |
| |
| // In case of HTTPS, set the client up for ignoring problems with self-signed |
| // certificates and stuff |
| if ("https".equals(scheme)) { |
| try { |
| SSLContextBuilder sslBuilder = SSLContexts.custom(); |
| sslBuilder.loadTrustMaterial(null, new TrustSelfSignedStrategy()); |
| final SSLContext sslContext = sslBuilder.build(); |
| |
| restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() { |
| @Override |
| public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { |
| // ignore issues with self-signed certificates |
| httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE); |
| return httpClientBuilder.setSSLContext(sslContext); |
| } |
| }); |
| } catch (Exception e) { |
| LOG.error("Error setting up SSLContext because: " + e.getMessage(), e); |
| } |
| } |
| |
| client = new RestHighLevelClient(restClientBuilder); |
| } else { |
| throw new IOException( |
| "ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the hosts"); |
| } |
| |
| return client; |
| } |
| |
| /** |
| * Generates a default BulkProcessor.Listener |
| * @return {@link BulkProcessor.Listener} |
| */ |
| protected BulkProcessor.Listener bulkProcessorListener() { |
| return new BulkProcessor.Listener() { |
| @Override |
| public void beforeBulk(long executionId, BulkRequest request) { |
| } |
| |
| @Override |
| public void afterBulk(long executionId, BulkRequest request, |
| Throwable failure) { |
| LOG.error("Elasticsearch indexing failed:", failure); |
| } |
| |
| @Override |
| public void afterBulk(long executionId, BulkRequest request, |
| BulkResponse response) { |
| if (response.hasFailures()) { |
| LOG.warn("Failures occurred during bulk request: {}", |
| response.buildFailureMessage()); |
| } |
| } |
| }; |
| } |
| |
| @Override |
| public void write(NutchDocument doc) throws IOException { |
| String id = (String) doc.getFieldValue("id"); |
| String type = doc.getDocumentMeta().get("type"); |
| if (type == null) |
| type = "doc"; |
| |
| // Add each field of this doc to the index builder |
| XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); |
| for (final Map.Entry<String, NutchField> e : doc) { |
| final List<Object> values = e.getValue().getValues(); |
| |
| if (values.size() > 1) { |
| builder.array(e.getKey(), values); |
| } else { |
| Object value = values.get(0); |
| if (value instanceof java.util.Date) { |
| value = DateTimeFormatter.ISO_INSTANT |
| .format(((java.util.Date) value).toInstant()); |
| } |
| builder.field(e.getKey(), value); |
| } |
| } |
| builder.endObject(); |
| |
| IndexRequest request = new IndexRequest(defaultIndex).id(id) |
| .source(builder); |
| request.opType(DocWriteRequest.OpType.INDEX); |
| |
| bulkProcessor.add(request); |
| } |
| |
| @Override |
| public void delete(String key) throws IOException { |
| DeleteRequest request = new DeleteRequest(defaultIndex, key); |
| bulkProcessor.add(request); |
| } |
| |
| @Override |
| public void update(NutchDocument doc) throws IOException { |
| write(doc); |
| } |
| |
| @Override |
| public void commit() throws IOException { |
| bulkProcessor.flush(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| // Close BulkProcessor (automatically flushes) |
| try { |
| bulkProcessor.awaitClose(bulkCloseTimeout, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| LOG.warn("interrupted while waiting for BulkProcessor to complete ({})", |
| e.getMessage()); |
| } |
| |
| client.close(); |
| } |
| |
| /** |
| * Returns {@link Map} with the specific parameters the IndexWriter instance |
| * can take. |
| * |
| * @return The values of each row. It must have the form |
| * <KEY,<DESCRIPTION,VALUE>>. |
| */ |
| @Override |
| public Map<String, Map.Entry<String, Object>> describe() { |
| Map<String, Map.Entry<String, Object>> properties = new LinkedHashMap<>(); |
| |
| properties.put(ElasticConstants.HOSTS, |
| new AbstractMap.SimpleEntry<>("Comma-separated list of hostnames", |
| this.hosts == null ? "" : String.join(",", hosts))); |
| properties.put(ElasticConstants.PORT, new AbstractMap.SimpleEntry<>( |
| "The port to connect to elastic server.", this.port)); |
| properties.put(ElasticConstants.SCHEME, new AbstractMap.SimpleEntry<>( |
| "The scheme (http or https) to connect to elastic server.", this.scheme)); |
| properties.put(ElasticConstants.INDEX, new AbstractMap.SimpleEntry<>( |
| "Default index to send documents to.", this.defaultIndex)); |
| properties.put(ElasticConstants.USER, new AbstractMap.SimpleEntry<>( |
| "Username for auth credentials", this.user)); |
| properties.put(ElasticConstants.PASSWORD, new AbstractMap.SimpleEntry<>( |
| "Password for auth credentials", StringUtil.mask(this.password))); |
| properties.put(ElasticConstants.MAX_BULK_DOCS, |
| new AbstractMap.SimpleEntry<>( |
| "Maximum size of the bulk in number of documents.", |
| this.maxBulkDocs)); |
| properties.put(ElasticConstants.MAX_BULK_LENGTH, |
| new AbstractMap.SimpleEntry<>("Maximum size of the bulk in bytes.", |
| this.maxBulkLength)); |
| properties.put(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS, |
| new AbstractMap.SimpleEntry<>( |
| "Initial delay for the BulkProcessor exponential backoff policy.", |
| this.expBackoffMillis)); |
| properties.put(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES, |
| new AbstractMap.SimpleEntry<>( |
| "Number of times the BulkProcessor exponential backoff policy should retry bulk operations.", |
| this.expBackoffRetries)); |
| properties.put(ElasticConstants.BULK_CLOSE_TIMEOUT, |
| new AbstractMap.SimpleEntry<>( |
| "Number of seconds allowed for the BulkProcessor to complete its last operation.", |
| this.bulkCloseTimeout)); |
| |
| return properties; |
| } |
| |
| @Override |
| public void setConf(Configuration conf) { |
| config = conf; |
| } |
| |
| @Override |
| public Configuration getConf() { |
| return config; |
| } |
| } |