blob: 700913434303a92efc4faa6737773ded3c9f8413 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.jdbc;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.SSLZookeeperFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
import org.apache.hive.service.server.HiveServer2Instance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
class ZooKeeperHiveClientHelper {
static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHiveClientHelper.class);
// Pattern for key1=value1;key2=value2
private static final Pattern kvPattern = Pattern.compile("([^=;]*)=([^;]*)[;]?");
private static String getZooKeeperNamespace(JdbcConnectionParams connParams) {
String zooKeeperNamespace = connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE);
if ((zooKeeperNamespace == null) || (zooKeeperNamespace.isEmpty())) {
// if active passive HA enabled, use default HA namespace
if (isZkHADynamicDiscoveryMode(connParams.getSessionVars())) {
zooKeeperNamespace = JdbcConnectionParams.ZOOKEEPER_ACTIVE_PASSIVE_HA_DEFAULT_NAMESPACE;
} else {
zooKeeperNamespace = JdbcConnectionParams.ZOOKEEPER_DEFAULT_NAMESPACE;
}
}
return zooKeeperNamespace;
}
/**
* Returns true is only if HA service discovery mode is enabled
*
* @param sessionConf - session configuration
* @return true if serviceDiscoveryMode=zooKeeperHA is specified in JDBC URI
*/
public static boolean isZkHADynamicDiscoveryMode(Map<String, String> sessionConf) {
final String discoveryMode = sessionConf.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE);
return (discoveryMode != null) &&
JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA.equalsIgnoreCase(discoveryMode);
}
/**
* Returns true is any service discovery mode is enabled (HA or non-HA)
*
* @param sessionConf - session configuration
* @return true if serviceDiscoveryMode is specified in JDBC URI
*/
public static boolean isZkDynamicDiscoveryMode(Map<String, String> sessionConf) {
final String discoveryMode = sessionConf.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE);
return (discoveryMode != null)
&& (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER.equalsIgnoreCase(discoveryMode) ||
JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA.equalsIgnoreCase(discoveryMode));
}
/**
* Parse and set up the SSL communication related Zookeeper params in connParams from sessionVars.
* @param connParams
*/
public static void setZkSSLParams(JdbcConnectionParams connParams) {
Map<String, String> sessionConf = connParams.getSessionVars();
boolean sslEnabled = false;
if (sessionConf.containsKey(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE)) {
sslEnabled = Boolean.parseBoolean(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE));
connParams.setZooKeeperSslEnabled(sslEnabled);
}
if (sslEnabled) {
connParams.setZookeeperKeyStoreLocation(
StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_LOCATION), ""));
connParams.setZookeeperKeyStorePassword(
StringUtils.defaultString(Utils.getPassword(sessionConf, JdbcConnectionParams.ZOOKEEPER_KEYSTORE_PASSWORD),
""));
connParams.setZookeeperTrustStoreLocation(
StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_LOCATION), ""));
connParams.setZookeeperTrustStorePassword(
StringUtils.defaultString(Utils.getPassword(sessionConf, JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_PASSWORD),
""));
}
}
private static CuratorFramework getZkClient(JdbcConnectionParams connParams) throws Exception {
String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
CuratorFramework zooKeeperClient =
CuratorFrameworkFactory.builder()
.connectString(zooKeeperEnsemble)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.zookeeperFactory(
new SSLZookeeperFactory(connParams.isZooKeeperSslEnabled(), connParams.getZookeeperKeyStoreLocation(),
connParams.getZookeeperKeyStorePassword(), connParams.getZookeeperTrustStoreLocation(),
connParams.getZookeeperTrustStorePassword()))
.build();
zooKeeperClient.start();
return zooKeeperClient;
}
/**
* Get a list of all HiveServer2 server hosts.
*
* @param connParams The current JDBC connection parameters
* @param zooKeeperClient The client to use to connect to ZooKeeper
* @return A list of HiveServer2 hosts
* @throws ZooKeeperHiveClientException Failed to communicate to ZooKeeper
*/
private static List<String> getServerHosts(final JdbcConnectionParams connParams,
final CuratorFramework zooKeeperClient) throws ZooKeeperHiveClientException {
final String zookeeperNamespace = getZooKeeperNamespace(connParams);
final String zkPath = ZKPaths.makePath(null, zookeeperNamespace);
try {
final List<String> serverHosts = zooKeeperClient.getChildren().forPath(zkPath);
LOG.info("Discovered HiveServer2 hosts in ZooKeeper [{}]: {}", zkPath, serverHosts);
if (serverHosts.isEmpty()) {
LOG.warn("Did not find any HiveServer2 hosts in ZooKeeper [{}]. "
+ "Check that the Hive ZooKeeper namespace is configured correctly.", zkPath);
}
// Remove the znodes we've already tried from this list
serverHosts.removeAll(connParams.getRejectedHostZnodePaths());
LOG.debug("Servers in ZooKeeper after removing rejected: {}", serverHosts);
return serverHosts;
} catch (Exception e) {
throw new ZooKeeperHiveClientException("Unable to retrive HS2 host information from ZooKeeper", e);
}
}
private static void updateParamsWithZKServerNode(JdbcConnectionParams connParams,
CuratorFramework zooKeeperClient, String serverNode) throws Exception {
final String zooKeeperNamespace = getZooKeeperNamespace(connParams);
connParams.setCurrentHostZnodePath(serverNode);
// Read data from the znode for this server node
// This data could be either config string (new releases) or server end
// point (old releases)
String dataStr =
new String(
zooKeeperClient.getData().forPath(ZKPaths.makePath(null, zooKeeperNamespace, serverNode)),
StandardCharsets.UTF_8);
// If dataStr is not null and dataStr is not a KV pattern,
// it must be the server uri added by an older version HS2
Matcher matcher = kvPattern.matcher(dataStr);
if ((dataStr != null) && (!matcher.find())) {
String[] split = dataStr.split(":");
if (split.length != 2) {
throw new ZooKeeperHiveClientException("Unable to parse HiveServer2 URI from ZooKeeper data: " + dataStr);
}
connParams.setHost(split[0]);
connParams.setPort(Integer.parseInt(split[1]));
} else {
applyConfs(dataStr, connParams);
}
}
static void configureConnParams(JdbcConnectionParams connParams) throws ZooKeeperHiveClientException {
if (isZkHADynamicDiscoveryMode(connParams.getSessionVars())) {
configureConnParamsHA(connParams);
} else {
CuratorFramework zooKeeperClient = null;
try {
zooKeeperClient = getZkClient(connParams);
final List<String> serverHosts = getServerHosts(connParams, zooKeeperClient);
if (serverHosts.isEmpty()) {
throw new ZooKeeperHiveClientException("No more HiveServer2 URIs from ZooKeeper to attempt");
}
// Pick a server node randomly
final String serverNode = serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size()));
updateParamsWithZKServerNode(connParams, zooKeeperClient, serverNode);
} catch (ZooKeeperHiveClientException zkhce) {
throw zkhce;
} catch (Exception e) {
throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e);
} finally {
if (zooKeeperClient != null) {
zooKeeperClient.close();
}
}
}
}
private static void configureConnParamsHA(JdbcConnectionParams connParams) throws ZooKeeperHiveClientException {
try {
Configuration registryConf = new Configuration();
registryConf.set(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, connParams.getZooKeeperEnsemble());
registryConf.set(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname,
getZooKeeperNamespace(connParams));
HS2ActivePassiveHARegistry haRegistryClient = HS2ActivePassiveHARegistryClient.getClient(registryConf);
boolean foundLeader = false;
String maxRetriesConf = connParams.getSessionVars().get(JdbcConnectionParams.RETRIES);
final int maxRetries = StringUtils.isEmpty(maxRetriesConf) ? 5 : Integer.parseInt(maxRetriesConf);
int retries = 0;
int sleepMs = 1000;
while (!foundLeader && retries < maxRetries) {
for (HiveServer2Instance hiveServer2Instance : haRegistryClient.getAll()) {
if (hiveServer2Instance.isLeader()) {
foundLeader = true;
connParams.setHost(hiveServer2Instance.getHost());
connParams.setPort(hiveServer2Instance.getRpcPort());
final String mode = hiveServer2Instance.getTransportMode().equals("http") ? "http:/" + hiveServer2Instance
.getHttpEndpoint() : hiveServer2Instance.getTransportMode();
LOG.info("Found HS2 Active Host: {} Port: {} Identity: {} Mode: {}", hiveServer2Instance.getHost(),
hiveServer2Instance.getRpcPort(), hiveServer2Instance.getWorkerIdentity(), mode);
// configurations are always published to ServiceRecord. Read/apply configs to JDBC connection params
String serverConfStr = Joiner.on(';').withKeyValueSeparator("=").join(hiveServer2Instance.getProperties());
if (LOG.isDebugEnabled()) {
LOG.debug("Configurations applied to JDBC connection params. {}", hiveServer2Instance.getProperties());
}
applyConfs(serverConfStr, connParams);
break;
}
}
if (!foundLeader) {
LOG.warn("Unable to connect to HS2 Active Host (No Leader Found!). Retrying after {} ms. retries: {}",
sleepMs, retries);
Thread.sleep(sleepMs);
retries++;
}
}
if (!foundLeader) {
throw new ZooKeeperHiveClientException("Unable to connect to HiveServer2 Active host (No leader found!) after" +
" " + maxRetries + " retries.");
}
} catch (Exception e) {
throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e);
}
}
static List<JdbcConnectionParams> getDirectParamsList(JdbcConnectionParams connParams)
throws ZooKeeperHiveClientException {
CuratorFramework zooKeeperClient = null;
try {
zooKeeperClient = getZkClient(connParams);
final List<String> serverHosts = getServerHosts(connParams, zooKeeperClient);
if (serverHosts.isEmpty()) {
throw new ZooKeeperHiveClientException("No more HiveServer2 URIs from ZooKeeper to attempt");
}
final List<JdbcConnectionParams> directParamsList = new ArrayList<>(serverHosts.size());
for (final String serverNode : serverHosts) {
JdbcConnectionParams directConnParams = new JdbcConnectionParams(connParams);
directParamsList.add(directConnParams);
updateParamsWithZKServerNode(directConnParams, zooKeeperClient, serverNode);
}
return directParamsList;
} catch (Exception e) {
throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e);
} finally {
if (zooKeeperClient != null) {
zooKeeperClient.close();
}
}
}
/**
* Apply configs published by the server. Configs specified from client's JDBC URI override
* configs published by the server.
*
* @param serverConfStr
* @param connParams
* @throws Exception
*/
private static void applyConfs(String serverConfStr, JdbcConnectionParams connParams)
throws Exception {
Matcher matcher = kvPattern.matcher(serverConfStr);
while (matcher.find()) {
// Have to use this if-else since switch-case on String is supported Java 7 onwards
if ((matcher.group(1) != null)) {
if ((matcher.group(2) == null)) {
throw new Exception("Null config value for: " + matcher.group(1)
+ " published by the server.");
}
// Set host
if (matcher.group(1).equals("hive.server2.thrift.bind.host")) {
connParams.setHost(matcher.group(2));
}
// Set transportMode
if ((matcher.group(1).equals("hive.server2.transport.mode"))
&& !(connParams.getSessionVars().containsKey(JdbcConnectionParams.TRANSPORT_MODE))) {
connParams.getSessionVars().put(JdbcConnectionParams.TRANSPORT_MODE, matcher.group(2));
}
// Set port
if (matcher.group(1).equals("hive.server2.thrift.port")) {
connParams.setPort(Integer.parseInt(matcher.group(2)));
}
if ((matcher.group(1).equals("hive.server2.thrift.http.port"))
&& !(connParams.getPort() > 0)) {
connParams.setPort(Integer.parseInt(matcher.group(2)));
}
// Set sasl qop
if ((matcher.group(1).equals("hive.server2.thrift.sasl.qop"))
&& !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_QOP))) {
connParams.getSessionVars().put(JdbcConnectionParams.AUTH_QOP, matcher.group(2));
}
// Set http path
if ((matcher.group(1).equals("hive.server2.thrift.http.path"))
&& !(connParams.getSessionVars().containsKey(JdbcConnectionParams.HTTP_PATH))) {
connParams.getSessionVars().put(JdbcConnectionParams.HTTP_PATH, matcher.group(2));
}
// Set SSL
if ((matcher.group(1) != null) && (matcher.group(1).equals("hive.server2.use.SSL"))
&& !(connParams.getSessionVars().containsKey(JdbcConnectionParams.USE_SSL))) {
connParams.getSessionVars().put(JdbcConnectionParams.USE_SSL, matcher.group(2));
}
/**
* Note: this is pretty messy, but sticking to the current implementation.
* Set authentication configs. Note that in JDBC driver, we have 3 auth modes: NOSASL,
* Kerberos (including delegation token mechanism) and password based.
* The use of JdbcConnectionParams.AUTH_TYPE==JdbcConnectionParams.AUTH_SIMPLE picks NOSASL.
* The presence of JdbcConnectionParams.AUTH_PRINCIPAL==<principal> picks Kerberos.
* If principal is absent, the presence of
* JdbcConnectionParams.AUTH_TYPE==JdbcConnectionParams.AUTH_TOKEN uses delegation token.
* Otherwise password based (which includes NONE, PAM, LDAP, CUSTOM)
*/
if (matcher.group(1).equals("hive.server2.authentication")) {
// NOSASL
if (matcher.group(2).equalsIgnoreCase("NOSASL")
&& !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_TYPE) && connParams
.getSessionVars().get(JdbcConnectionParams.AUTH_TYPE)
.equalsIgnoreCase(JdbcConnectionParams.AUTH_SIMPLE))) {
connParams.getSessionVars().put(JdbcConnectionParams.AUTH_TYPE,
JdbcConnectionParams.AUTH_SIMPLE);
}
}
// KERBEROS
// If delegation token is passed from the client side, do not set the principal
if (matcher.group(1).equalsIgnoreCase("hive.server2.authentication.kerberos.principal")
&& !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_TYPE) && connParams
.getSessionVars().get(JdbcConnectionParams.AUTH_TYPE)
.equalsIgnoreCase(JdbcConnectionParams.AUTH_TOKEN))
&& !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_PRINCIPAL))) {
connParams.getSessionVars().put(JdbcConnectionParams.AUTH_PRINCIPAL, matcher.group(2));
}
}
}
}
}