blob: 0ddf539d3b3f0b994c3d5fbba14567e7813f87f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.indexwriter.elasticrest;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.JestResultHandler;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Delete;
import io.searchbox.core.Index;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.http.concurrent.BasicFuture;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import org.apache.nutch.indexer.IndexWriter;
import org.apache.nutch.indexer.IndexWriterParams;
import org.apache.nutch.indexer.NutchDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.URL;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.AbstractMap;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
/**
*/
public class ElasticRestIndexWriter implements IndexWriter {
public static Logger LOG = LoggerFactory
.getLogger(ElasticRestIndexWriter.class);
private static final int DEFAULT_MAX_BULK_DOCS = 250;
private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
private static final String DEFAULT_SEPARATOR = "_";
private static final String DEFAULT_SINK = "others";
private JestClient client;
private String defaultIndex;
private String defaultType = null;
private Configuration config;
private Bulk.Builder bulkBuilder;
private int port = -1;
private String host = null;
private Boolean https = null;
private String user = null;
private String password = null;
private Boolean trustAllHostnames = null;
private int maxBulkDocs;
private int maxBulkLength;
private long indexedDocs = 0;
private int bulkDocs = 0;
private int bulkLength = 0;
private boolean createNewBulk = false;
private long millis;
private BasicFuture<JestResult> basicFuture = null;
private String[] languages = null;
private String separator = null;
private String sink = null;
@Override
public void open(Configuration conf, String name) throws IOException {
//Implementation not required
}
@Override
public void open(IndexWriterParams parameters) throws IOException {
host = parameters.get(ElasticRestConstants.HOST);
if (StringUtils.isBlank(host)) {
String message = "Missing host. It should be set in index-writers.xml";
message += "\n" + describe();
LOG.error(message);
throw new RuntimeException(message);
}
port = parameters.getInt(ElasticRestConstants.PORT, 9200);
user = parameters.get(ElasticRestConstants.USER);
password = parameters.get(ElasticRestConstants.PASSWORD);
https = parameters.getBoolean(ElasticRestConstants.HTTPS, false);
trustAllHostnames = parameters
.getBoolean(ElasticRestConstants.HOSTNAME_TRUST, false);
languages = parameters.getStrings(ElasticRestConstants.LANGUAGES);
separator = parameters
.get(ElasticRestConstants.SEPARATOR, DEFAULT_SEPARATOR);
sink = parameters.get(ElasticRestConstants.SINK, DEFAULT_SINK);
// trust ALL certificates
SSLContext sslContext = null;
try {
sslContext = new SSLContextBuilder()
.loadTrustMaterial(new TrustStrategy() {
public boolean isTrusted(X509Certificate[] arg0, String arg1)
throws CertificateException {
return true;
}
}).build();
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
LOG.error("Failed to instantiate sslcontext object: \n{}",
ExceptionUtils.getStackTrace(e));
throw new SecurityException();
}
// skip hostname checks
HostnameVerifier hostnameVerifier = null;
if (trustAllHostnames) {
hostnameVerifier = NoopHostnameVerifier.INSTANCE;
} else {
hostnameVerifier = new DefaultHostnameVerifier();
}
SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(
sslContext);
SchemeIOSessionStrategy httpsIOSessionStrategy = new SSLIOSessionStrategy(
sslContext, hostnameVerifier);
JestClientFactory jestClientFactory = new JestClientFactory();
URL urlOfElasticsearchNode = new URL(https ? "https" : "http", host, port,
"");
if (host != null && port > 1) {
HttpClientConfig.Builder builder = new HttpClientConfig.Builder(
urlOfElasticsearchNode.toString()).multiThreaded(true)
.connTimeout(300000).readTimeout(300000);
if (https) {
if (user != null && password != null) {
builder.defaultCredentials(user, password);
}
builder.defaultSchemeForDiscoveredNodes("https")
.sslSocketFactory(sslSocketFactory) // this only affects sync calls
.httpsIOSessionStrategy(
httpsIOSessionStrategy); // this only affects async calls
}
jestClientFactory.setHttpClientConfig(builder.build());
} else {
throw new IllegalStateException(
"No host or port specified. Please set the host and port in nutch-site.xml");
}
client = jestClientFactory.getObject();
defaultIndex = parameters.get(ElasticRestConstants.INDEX, "nutch");
defaultType = parameters.get(ElasticRestConstants.TYPE, "doc");
maxBulkDocs = parameters
.getInt(ElasticRestConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
maxBulkLength = parameters
.getInt(ElasticRestConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
bulkBuilder = new Bulk.Builder().defaultIndex(defaultIndex)
.defaultType(defaultType);
}
private static Object normalizeValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof Map || value instanceof Date) {
return value;
}
return value.toString();
}
@Override
public void write(NutchDocument doc) throws IOException {
String id = (String) doc.getFieldValue("id");
String type = doc.getDocumentMeta().get("type");
if (type == null) {
type = defaultType;
}
Map<String, Object> source = new HashMap<String, Object>();
// Loop through all fields of this doc
for (String fieldName : doc.getFieldNames()) {
Set<Object> allFieldValues = new LinkedHashSet<>(
doc.getField(fieldName).getValues());
if (allFieldValues.size() > 1) {
Object[] normalizedFieldValues = allFieldValues.stream()
.map(ElasticRestIndexWriter::normalizeValue).toArray();
// Loop through the values to keep track of the size of this document
for (Object value : normalizedFieldValues) {
bulkLength += value.toString().length();
}
source.put(fieldName, normalizedFieldValues);
} else if (allFieldValues.size() == 1) {
Object normalizedFieldValue = normalizeValue(
allFieldValues.iterator().next());
source.put(fieldName, normalizedFieldValue);
bulkLength += normalizedFieldValue.toString().length();
}
}
String index;
if (languages != null && languages.length > 0) {
String language = (String) doc.getFieldValue("lang");
boolean exists = false;
for (String lang : languages) {
if (lang.equals(language)) {
exists = true;
break;
}
}
if (exists) {
index = getLanguageIndexName(language);
} else {
index = getSinkIndexName();
}
} else {
index = defaultIndex;
}
Index indexRequest = new Index.Builder(source).index(index).type(type)
.id(id).build();
// Add this indexing request to a bulk request
bulkBuilder.addAction(indexRequest);
indexedDocs++;
bulkDocs++;
if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
LOG.info(
"Processing bulk request [docs = {}, length = {}, total docs = {}, last doc in bulk = '{}']",
bulkDocs, bulkLength, indexedDocs, id);
// Flush the bulk of indexing requests
createNewBulk = true;
commit();
}
}
@Override
public void delete(String key) throws IOException {
try {
if (languages != null && languages.length > 0) {
Bulk.Builder bulkBuilder = new Bulk.Builder().defaultType(defaultType);
for (String lang : languages) {
bulkBuilder.addAction(
new Delete.Builder(key).index(getLanguageIndexName(lang))
.type(defaultType).build());
}
bulkBuilder.addAction(
new Delete.Builder(key).index(getSinkIndexName()).type(defaultType)
.build());
client.execute(bulkBuilder.build());
} else {
client.execute(
new Delete.Builder(key).index(defaultIndex).type(defaultType)
.build());
}
} catch (IOException e) {
LOG.error(ExceptionUtils.getStackTrace(e));
throw e;
}
}
@Override
public void update(NutchDocument doc) throws IOException {
try {
write(doc);
} catch (IOException e) {
LOG.error(ExceptionUtils.getStackTrace(e));
throw e;
}
}
@Override
public void commit() throws IOException {
if (basicFuture != null) {
// wait for previous to finish
long beforeWait = System.currentTimeMillis();
try {
JestResult result = basicFuture.get();
if (result == null) {
throw new RuntimeException();
}
long msWaited = System.currentTimeMillis() - beforeWait;
LOG.info("Previous took in ms {}, including wait {}", millis, msWaited);
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error waiting for result ", e);
}
basicFuture = null;
}
if (bulkBuilder != null) {
if (bulkDocs > 0) {
// start a flush, note that this is an asynchronous call
basicFuture = new BasicFuture<>(null);
millis = System.currentTimeMillis();
client.executeAsync(bulkBuilder.build(),
new JestResultHandler<BulkResult>() {
@Override
public void completed(BulkResult bulkResult) {
basicFuture.completed(bulkResult);
millis = System.currentTimeMillis() - millis;
}
@Override
public void failed(Exception e) {
basicFuture.completed(null);
LOG.error("Failed result: ", e);
}
});
}
bulkBuilder = null;
}
if (createNewBulk) {
// Prepare a new bulk request
bulkBuilder = new Bulk.Builder().defaultIndex(defaultIndex)
.defaultType(defaultType);
bulkDocs = 0;
bulkLength = 0;
}
}
@Override
public void close() throws IOException {
// Flush pending requests
LOG.info(
"Processing remaining requests [docs = {}, length = {}, total docs = {}]",
bulkDocs, bulkLength, indexedDocs);
createNewBulk = false;
commit();
// flush one more time to finalize the last bulk
LOG.info("Processing to finalize last execute");
createNewBulk = false;
commit();
// Close
client.shutdownClient();
}
/**
* Returns {@link Map} with the specific parameters the IndexWriter instance can take.
*
* @return The values of each row. It must have the form <KEY,<DESCRIPTION,VALUE>>.
*/
@Override
public Map<String, Map.Entry<String, Object>> describe() {
Map<String, Map.Entry<String, Object>> properties = new LinkedHashMap<>();
properties.put(ElasticRestConstants.HOST, new AbstractMap.SimpleEntry<>(
"The hostname or a list of comma separated hostnames to send documents "
+ "to using Elasticsearch Jest. Both host and port must be defined.",
this.host));
properties.put(ElasticRestConstants.PORT, new AbstractMap.SimpleEntry<>(
"The port to connect to using Elasticsearch Jest.", this.port));
properties.put(ElasticRestConstants.INDEX,
new AbstractMap.SimpleEntry<>("Default index to send documents to.",
this.defaultIndex));
properties.put(ElasticRestConstants.MAX_BULK_DOCS,
new AbstractMap.SimpleEntry<>(
"Maximum size of the bulk in number of documents.",
this.maxBulkDocs));
properties.put(ElasticRestConstants.MAX_BULK_LENGTH,
new AbstractMap.SimpleEntry<>("Maximum size of the bulk in bytes.",
this.maxBulkLength));
properties.put(ElasticRestConstants.USER, new AbstractMap.SimpleEntry<>(
"Username for auth credentials (only used when https is enabled)",
this.user));
properties.put(ElasticRestConstants.PASSWORD, new AbstractMap.SimpleEntry<>(
"Password for auth credentials (only used when https is enabled)",
this.password));
properties.put(ElasticRestConstants.TYPE,
new AbstractMap.SimpleEntry<>("Default type to send documents to.",
this.defaultType));
properties.put(ElasticRestConstants.HTTPS, new AbstractMap.SimpleEntry<>(
"true to enable https, false to disable https. If you've disabled http "
+ "access (by forcing https), be sure to set this to true, otherwise "
+ "you might get \"connection reset by peer\".", this.https));
properties.put(ElasticRestConstants.HOSTNAME_TRUST,
new AbstractMap.SimpleEntry<>(
"true to trust elasticsearch server's certificate even if its listed "
+ "domain name does not match the domain they are hosted or false "
+ "to check if the elasticsearch server's certificate's listed "
+ "domain is the same domain that it is hosted on, and if "
+ "it doesn't, then fail to index (only used when https is enabled)",
this.trustAllHostnames));
properties.put(ElasticRestConstants.LANGUAGES,
new AbstractMap.SimpleEntry<>(
"A list of strings denoting the supported languages (e.g. en, de, fr, it). "
+ "If this value is empty all documents will be sent to index property. "
+ "If not empty the Rest client will distribute documents in different "
+ "indices based on their languages property. Indices are named with the "
+ "following schema: index separator language (e.g. nutch_de). "
+ "Entries with an unsupported languages value will be added to "
+ "index index separator sink (e.g. nutch_others).",
this.languages == null ? "" : String.join(",", languages)));
properties.put(ElasticRestConstants.SEPARATOR,
new AbstractMap.SimpleEntry<>(
"Is used only if languages property is defined to build the index name "
+ "(i.e. index separator lang).", this.separator));
properties.put(ElasticRestConstants.SINK, new AbstractMap.SimpleEntry<>(
"Is used only if languages property is defined to build the index name "
+ "where to store documents with unsupported languages "
+ "(i.e. index separator sink).", this.sink));
return properties;
}
@Override
public void setConf(Configuration conf) {
config = conf;
}
@Override
public Configuration getConf() {
return config;
}
private String getLanguageIndexName(String lang) {
return getComposedIndexName(defaultIndex, lang);
}
private String getSinkIndexName() {
return getComposedIndexName(defaultIndex, sink);
}
private String getComposedIndexName(String prefix, String postfix) {
return prefix + separator + postfix;
}
}