blob: 4e0b2fe142f9fcf268a7b602b97053792713933c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.client;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.metron.elasticsearch.config.ElasticsearchClientConfig;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils.HostnamePort;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Main entry point to create the ES client.
*/
public class ElasticsearchClientFactory {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String ES_SETTINGS_KEY = "es.client.settings"; // es config key in global config
/**
* Creates an Elasticsearch client from settings provided via the global config.
*
* @return new client
*/
public static ElasticsearchClient create(Map<String, Object> globalConfig) {
ElasticsearchClientConfig esClientConfig = new ElasticsearchClientConfig(
getEsSettings(globalConfig));
HttpHost[] httpHosts = getHttpHosts(globalConfig, esClientConfig.getConnectionScheme());
RestClientBuilder builder = RestClient.builder(httpHosts);
builder.setRequestConfigCallback(reqConfigBuilder -> {
// Modifies request config builder with connection and socket timeouts.
// https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_timeouts.html
reqConfigBuilder.setConnectTimeout(esClientConfig.getConnectTimeoutMillis());
reqConfigBuilder.setSocketTimeout(esClientConfig.getSocketTimeoutMillis());
return reqConfigBuilder;
});
builder.setMaxRetryTimeoutMillis(esClientConfig.getMaxRetryTimeoutMillis());
builder.setHttpClientConfigCallback(clientBuilder -> {
clientBuilder.setDefaultIOReactorConfig(getIOReactorConfig(esClientConfig));
clientBuilder.setDefaultCredentialsProvider(getCredentialsProvider(esClientConfig));
clientBuilder.setSSLContext(getSSLContext(esClientConfig));
return clientBuilder;
});
RestClient lowLevelClient = builder.build();
RestHighLevelClient client = new RestHighLevelClient(lowLevelClient);
return new ElasticsearchClient(lowLevelClient, client);
}
private static Map<String, Object> getEsSettings(Map<String, Object> globalConfig) {
return (Map<String, Object>) globalConfig.getOrDefault(ES_SETTINGS_KEY, new HashMap<>());
}
private static HttpHost[] getHttpHosts(Map<String, Object> globalConfiguration, String scheme) {
List<HostnamePort> hps = ElasticsearchUtils.getIps(globalConfiguration);
HttpHost[] httpHosts = new HttpHost[hps.size()];
int i = 0;
for (HostnamePort hp : hps) {
httpHosts[i++] = new HttpHost(hp.hostname, hp.port, scheme);
}
return httpHosts;
}
/**
* Creates config with setting for num connection threads. Default is ES client default,
* which is 1 to num processors per the documentation.
* https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_number_of_threads.html
*/
private static IOReactorConfig getIOReactorConfig(ElasticsearchClientConfig esClientConfig) {
if (esClientConfig.getNumClientConnectionThreads().isPresent()) {
Integer numThreads = esClientConfig.getNumClientConnectionThreads().get();
LOG.info("Setting number of client connection threads: {}", numThreads);
return IOReactorConfig.custom().setIoThreadCount(numThreads).build();
} else {
return IOReactorConfig.DEFAULT;
}
}
private static CredentialsProvider getCredentialsProvider(
ElasticsearchClientConfig esClientConfig) {
Optional<Entry<String, String>> credentials = esClientConfig.getCredentials();
if (credentials.isPresent()) {
LOG.info(
"Found auth credentials - setting up user/pass authenticated client connection for ES.");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
UsernamePasswordCredentials upcredentials = new UsernamePasswordCredentials(
credentials.get().getKey(), credentials.get().getValue());
credentialsProvider.setCredentials(AuthScope.ANY, upcredentials);
return credentialsProvider;
} else {
LOG.info(
"Elasticsearch client credentials not provided. Defaulting to non-authenticated client connection.");
return null;
}
}
/**
* <p>Setup connection encryption details (SSL) if applicable.
* If ssl.enabled=true, sets up SSL connection. If enabled, keystore.path is required. User can
* also optionally set keystore.password and keystore.type.
* https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_encrypted_communication.html
* <p>
* <p>Other guidance on the HTTP Component library and configuring SSL connections.
* http://www.robinhowlett.com/blog/2016/01/05/everything-you-ever-wanted-to-know-about-ssl-but-were-afraid-to-ask.
* <p>
* <p>JSSE docs - https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html
* <p>
* <p>Additional guidance for configuring Elasticsearch for SSL can be found here - https://www.elastic.co/guide/en/x-pack/5.6/ssl-tls.html
*/
private static SSLContext getSSLContext(ElasticsearchClientConfig esClientConfig) {
if (esClientConfig.isSSLEnabled()) {
LOG.info("Configuring client for SSL connection.");
if (!esClientConfig.getKeyStorePath().isPresent()) {
throw new IllegalStateException("KeyStore path must be provided for SSL connection.");
}
Optional<String> optKeyStorePass = esClientConfig.getKeyStorePassword();
char[] keyStorePass = optKeyStorePass.map(String::toCharArray).orElse(null);
KeyStore trustStore = getStore(esClientConfig.getKeyStoreType(),
esClientConfig.getKeyStorePath().get(), keyStorePass);
try {
SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
return sslBuilder.build();
} catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) {
throw new IllegalStateException("Unable to load truststore.", e);
}
}
return null;
}
private static KeyStore getStore(String type, Path path, char[] pass) {
KeyStore store;
try {
store = KeyStore.getInstance(type);
} catch (KeyStoreException e) {
throw new IllegalStateException("Unable to get keystore type '" + type + "'", e);
}
try (InputStream is = Files.newInputStream(path)) {
store.load(is, pass);
} catch (IOException | NoSuchAlgorithmException | CertificateException e) {
throw new IllegalStateException("Unable to load keystore from path '" + path + "'", e);
}
return store;
}
}