blob: 78c338bdad5ae12f281d5a555ba661cdeb6cca52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ranger.elasticsearch;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.config.Lookup;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.log4j.Logger;
import org.apache.ranger.audit.destination.ElasticSearchAuditDestination;
import org.apache.ranger.audit.provider.MiscUtil;
import org.apache.ranger.authorization.credutils.CredentialsProviderUtil;
import org.apache.ranger.authorization.credutils.kerberos.KerberosCredentialsProvider;
import org.apache.ranger.common.PropertiesUtil;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.stereotype.Component;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosTicket;
import java.io.File;
import java.security.PrivilegedActionException;
import java.util.Date;
import java.util.Locale;
import static org.apache.ranger.audit.destination.ElasticSearchAuditDestination.*;
/**
* This class initializes the ElasticSearch client
*
*/
@Component
public class ElasticSearchMgr {
private static final Logger logger = Logger.getLogger(ElasticSearchMgr.class);
public String index;
Subject subject;
String user;
String password;
synchronized void connect() {
if (client == null) {
synchronized (ElasticSearchAuditDestination.class) {
if (client == null) {
String urls = PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_URLS);
String protocol = PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_PROTOCOL, "http");
user = PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_USER, "");
password = PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_PWRD, "");
int port = Integer.parseInt(PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_PORT));
this.index = PropertiesUtil.getProperty(CONFIG_PREFIX + "." + CONFIG_INDEX, "ranger_audits");
String parameterString = String.format(Locale.ROOT,"User:%s, %s://%s:%s/%s", user, protocol, urls, port, index);
logger.info("Initializing ElasticSearch " + parameterString);
if (urls != null) {
urls = urls.trim();
}
if (StringUtils.isBlank(urls) || "NONE".equalsIgnoreCase(urls.trim())) {
logger.info(String.format("Clearing URI config value: %s", urls));
urls = null;
}
try {
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && password.contains("keytab") && new File(password).exists()) {
subject = CredentialsProviderUtil.login(user, password);
}
RestClientBuilder restClientBuilder =
getRestClientBuilder(urls, protocol, user, password, port);
client = new RestHighLevelClient(restClientBuilder);
} catch (Throwable t) {
logger.fatal("Can't connect to ElasticSearch: " + parameterString, t);
}
}
}
}
}
public static RestClientBuilder getRestClientBuilder(String urls, String protocol, String user, String password, int port) {
RestClientBuilder restClientBuilder = RestClient.builder(
MiscUtil.toArray(urls, ",").stream()
.map(x -> new HttpHost(x, port, protocol))
.<HttpHost>toArray(i -> new HttpHost[i])
);
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && !user.equalsIgnoreCase("NONE") && !password.equalsIgnoreCase("NONE")) {
if (password.contains("keytab") && new File(password).exists()) {
final KerberosCredentialsProvider credentialsProvider =
CredentialsProviderUtil.getKerberosCredentials(user, password);
Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()).build();
restClientBuilder.setHttpClientConfigCallback(clientBuilder -> {
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
clientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
return clientBuilder;
});
} else {
final CredentialsProvider credentialsProvider =
CredentialsProviderUtil.getBasicCredentials(user, password);
restClientBuilder.setHttpClientConfigCallback(clientBuilder ->
clientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
} else {
logger.error("ElasticSearch Credentials not provided!!");
final CredentialsProvider credentialsProvider = null;
restClientBuilder.setHttpClientConfigCallback(clientBuilder ->
clientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
return restClientBuilder;
}
RestHighLevelClient client = null;
public RestHighLevelClient getClient() {
if (client != null && subject != null) {
KerberosTicket ticket = CredentialsProviderUtil.getTGT(subject);
try {
if (new Date().getTime() > ticket.getEndTime().getTime()){
client = null;
CredentialsProviderUtil.ticketExpireTime80 = 0;
connect();
} else if (CredentialsProviderUtil.ticketWillExpire(ticket)) {
subject = CredentialsProviderUtil.login(user, password);
}
} catch (PrivilegedActionException e) {
logger.error("PrivilegedActionException:", e);
throw new RuntimeException(e);
}
return client;
} else {
connect();
}
return client;
}
}