SENTRY-1580: Provide pooled client connection model with HA
diff --git a/pom.xml b/pom.xml
index ad54cfd..ba13a0d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
<build.helper.maven.plugin.version>1.8</build.helper.maven.plugin.version>
<cglib.version>2.2</cglib.version>
<commons-cli.version>1.2</commons-cli.version>
- <commons-pool2.version>2.2</commons-pool2.version>
+ <commons-pool2.version>2.4.2</commons-pool2.version>
<commons.lang.version>2.6</commons.lang.version>
<commons.logging.version>1.2</commons.logging.version>
<curator.version>2.11.1</curator.version>
diff --git a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java
index a3140f2..8a5085b 100644
--- a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java
+++ b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java
@@ -291,23 +291,27 @@
Map<String, Map<String, Set<String>>> policyFileMappingData = sentryPolicyFileFormatter.parse(
importPolicyFilePath, authzConf);
// todo: here should be an validator to check the data's value, format, hierarchy
- SentryPolicyServiceClient client = SentryServiceClientFactory.create(getAuthzConf());
- // import the mapping data to database
- client.importPolicy(policyFileMappingData, requestorUserName, importOverwriteRole);
+ try(SentryPolicyServiceClient client =
+ SentryServiceClientFactory.create(getAuthzConf())) {
+ // import the mapping data to database
+ client.importPolicy(policyFileMappingData, requestorUserName, importOverwriteRole);
+ }
}
// export the sentry mapping data to file
public void exportPolicy() throws Exception {
String requestorUserName = System.getProperty("user.name", "");
- SentryPolicyServiceClient client = SentryServiceClientFactory.create(getAuthzConf());
- // export the sentry mapping data from database to map structure
- Map<String, Map<String, Set<String>>> policyFileMappingData = client
- .exportPolicy(requestorUserName, objectPath);
- // get the FileFormatter according to the configuration
- SentryPolicyFileFormatter sentryPolicyFileFormatter = SentryPolicyFileFormatFactory
- .createFileFormatter(authzConf);
- // write the sentry mapping data to exportPolicyFilePath with the data in map structure
- sentryPolicyFileFormatter.write(exportPolicyFilePath, policyFileMappingData);
+ try (SentryPolicyServiceClient client =
+ SentryServiceClientFactory.create(getAuthzConf())) {
+ // export the sentry mapping data from database to map structure
+ Map<String, Map<String, Set<String>>> policyFileMappingData = client
+ .exportPolicy(requestorUserName, objectPath);
+ // get the FileFormatter according to the configuration
+ SentryPolicyFileFormatter sentryPolicyFileFormatter = SentryPolicyFileFormatFactory
+ .createFileFormatter(authzConf);
+ // write the sentry mapping data to exportPolicyFilePath with the data in map structure
+ sentryPolicyFileFormatter.write(exportPolicyFilePath, policyFileMappingData);
+ }
}
// list permissions for given user
diff --git a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java
index 262db11..2abdd53 100644
--- a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java
+++ b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java
@@ -356,11 +356,11 @@
throws SentryUserException, IOException, MetaException {
String requestorUserName = UserGroupInformation.getCurrentUser()
.getShortUserName();
- SentryPolicyServiceClient sentryClient = getSentryServiceClient();
- sentryClient.dropPrivileges(requestorUserName, authorizableTable);
-
- // Close the connection after dropping privileges is done.
- sentryClient.close();
+ try(SentryPolicyServiceClient sentryClient = getSentryServiceClient()) {
+ sentryClient.dropPrivileges(requestorUserName, authorizableTable);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
private void renameSentryTablePrivilege(String oldDbName, String oldTabName,
@@ -379,9 +379,7 @@
if (!oldTabName.equalsIgnoreCase(newTabName)
&& syncWithPolicyStore(AuthzConfVars.AUTHZ_SYNC_ALTER_WITH_POLICY_STORE)) {
- SentryPolicyServiceClient sentryClient = getSentryServiceClient();
-
- try {
+ try (SentryPolicyServiceClient sentryClient = getSentryServiceClient()){
String requestorUserName = UserGroupInformation.getCurrentUser()
.getShortUserName();
sentryClient.renamePrivileges(requestorUserName, oldAuthorizableTable, newAuthorizableTable);
@@ -392,10 +390,8 @@
+ " Error: " + e.getMessage());
} catch (IOException e) {
throw new MetaException("Failed to find local user " + e.getMessage());
- } finally {
-
- // Close the connection after renaming privileges is done.
- sentryClient.close();
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
// The HDFS plugin needs to know if it's a path change (set location)
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java
index da587d0..3f06cae 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java
@@ -98,7 +98,6 @@
private static final int terminator = Utilities.newLineCode;
private static final long serialVersionUID = -7625118066790571999L;
- private SentryPolicyServiceClient sentryClient;
private HiveConf conf;
private HiveAuthzBinding hiveAuthzBinding;
private HiveAuthzConf authzConf;
@@ -116,13 +115,8 @@
@Override
public int execute(DriverContext driverContext) {
- try {
- try {
- this.sentryClient = SentryServiceClientFactory.create(authzConf);
- } catch (Exception e) {
- String msg = "Error creating Sentry client: " + e.getMessage();
- throw new RuntimeException(msg, e);
- }
+ try (SentryPolicyServiceClient sentryClient =
+ SentryServiceClientFactory.create(authzConf)) {
Preconditions.checkNotNull(hiveAuthzBinding, "HiveAuthzBinding cannot be null");
Preconditions.checkNotNull(authzConf, "HiveAuthConf cannot be null");
Preconditions.checkNotNull(subject, "Subject cannot be null");
@@ -179,9 +173,6 @@
console.printError(msg);
return RETURN_CODE_FAILURE;
} finally {
- if (sentryClient != null) {
- sentryClient.close();
- }
if (hiveAuthzBinding != null) {
hiveAuthzBinding.close();
}
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
index fdb6df4..3ec2eed 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
@@ -364,11 +364,12 @@
throws SentryUserException, IOException, MetaException {
String requestorUserName = UserGroupInformation.getCurrentUser()
.getShortUserName();
- SentryPolicyServiceClient sentryClient = getSentryServiceClient();
- sentryClient.dropPrivileges(requestorUserName, authorizableTable);
-
- // Close the connection after dropping privileges is done.
- sentryClient.close();
+ try (SentryPolicyServiceClient sentryClient = SentryServiceClientFactory.create(authzConf)) {
+ sentryClient.dropPrivileges(requestorUserName, authorizableTable);
+ } catch (Exception e) {
+ throw new MetaException("Failed to connect to Sentry service "
+ + e.getMessage());
+ }
}
private void renameSentryTablePrivilege(String oldDbName, String oldTabName,
@@ -403,7 +404,11 @@
} finally {
// Close the connection after renaming privileges is done.
- sentryClient.close();
+ try {
+ sentryClient.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
// The HDFS plugin needs to know if it's a path change (set location)
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index 4851114..f5d4431 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -365,9 +365,7 @@
}
private <T> T execute(Command<T> cmd) throws KafkaException {
- SentryGenericServiceClient client = null;
- try {
- client = getClient();
+ try (SentryGenericServiceClient client = getClient()){
return cmd.run(client);
} catch (SentryUserException ex) {
String msg = "Unable to excute command on sentry server: " + ex.getMessage();
@@ -377,10 +375,6 @@
String msg = "Unable to obtain client:" + ex.getMessage();
LOG.error(msg, ex);
throw new KafkaException(msg, ex);
- } finally {
- if (client != null) {
- client.close();
- }
}
}
diff --git a/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java b/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java
index 2400673..37adb56 100644
--- a/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java
+++ b/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java
@@ -298,9 +298,7 @@
if (!isSyncEnabled()) {
return;
}
- SentryGenericServiceClient client = null;
- try {
- client = getClient();
+ try (SentryGenericServiceClient client = getClient()) {
TSentryPrivilege tPrivilege = new TSentryPrivilege();
tPrivilege.setComponent(AuthorizationComponent.Search);
tPrivilege.setServiceName(authzConf.get(SENTRY_SEARCH_SERVICE_KEY,
@@ -316,10 +314,6 @@
" can't delete privileges for collection " + collection);
} catch (Exception ex) {
throw new SentrySolrAuthorizationException("Unable to obtain client:" + ex.getMessage());
- } finally {
- if (client != null) {
- client.close();
- }
}
}
}
diff --git a/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java b/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java
index 84a61cc..11e2aa4 100644
--- a/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java
+++ b/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java
@@ -408,9 +408,7 @@
}
private <T> T execute(Command<T> cmd) throws SqoopException {
- SentryGenericServiceClient client = null;
- try {
- client = getClient();
+ try (SentryGenericServiceClient client = getClient()){
return cmd.run(client);
} catch (SentryUserException ex) {
String msg = "Unable to excute command on sentry server: " + ex.getMessage();
@@ -420,10 +418,6 @@
String msg = "Unable to obtain client:" + ex.getMessage();
LOG.error(msg, ex);
throw new SqoopException(SecurityError.AUTH_0014, msg, ex);
- } finally {
- if (client != null) {
- client.close();
- }
}
}
}
diff --git a/sentry-core/sentry-core-common/pom.xml b/sentry-core/sentry-core-common/pom.xml
index e1be256..2e7f5c5 100644
--- a/sentry-core/sentry-core-common/pom.xml
+++ b/sentry-core/sentry-core-common/pom.xml
@@ -66,6 +66,10 @@
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
index 34a594e..112f050 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
@@ -25,7 +25,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -46,17 +45,17 @@
* <p>
*/
-public class RetryClientInvocationHandler extends SentryClientInvocationHandler {
+public final class RetryClientInvocationHandler extends SentryClientInvocationHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(RetryClientInvocationHandler.class);
- private SentryServiceClient client = null;
+ private SentryConnection client = null;
private final int maxRetryCount;
/**
* Initialize the sentry configurations, including rpc retry count and client connection
* configs for SentryPolicyServiceClientDefaultImpl
*/
- public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject,
+ public RetryClientInvocationHandler(Configuration conf, SentryConnection clientObject,
SentryClientTransportConfigInterface transportConfig) {
Preconditions.checkNotNull(conf, "Configuration object cannot be null");
Preconditions.checkNotNull(clientObject, "Client Object cannot be null");
@@ -72,76 +71,78 @@
* resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException
* if failed retry after rpcRetryTotal times.
* if it is failed with other exception, method would just re-throw the exception.
- * Synchronized it for thread safety.
*/
@Override
public synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
- int retryCount = 0;
Exception lastExc = null;
- while (retryCount < maxRetryCount) {
- // Connect to a sentry server if not connected yet.
- try {
- client.connect();
- } catch (IOException e) {
- // Increase the retry num
- // Retry when the exception is caused by connection problem.
- retryCount++;
- lastExc = e;
- close();
- continue;
- }
+ for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
+ connect();
// do the thrift call
try {
+ LOGGER.debug("Calling client {}", method.getName());
return method.invoke(client, args);
} catch (InvocationTargetException e) {
// Get the target exception, check if SentryUserException or TTransportException is wrapped.
// TTransportException means there is a connection problem.
+ LOGGER.error("failed to execute {}", method.getName(), e);
Throwable targetException = e.getCause();
- if (targetException instanceof SentryUserException ||
- targetException instanceof SentryHdfsServiceException) {
- Throwable sentryTargetException = targetException.getCause();
- // If there has connection problem, eg, invalid connection if the service restarted,
- // sentryTargetException instanceof TTransportException will be true.
- if (sentryTargetException instanceof TTransportException) {
- // Retry when the exception is caused by connection problem.
- lastExc = new TTransportException(sentryTargetException);
- LOGGER.error("Thrift call failed with TTransportException", lastExc);
- // Closing the thrift client on TTransportException. New client object is
- // created using new socket when an attempt to reconnect is made.
- close();
- } else {
- // The exception is thrown by thrift call, eg, SentryAccessDeniedException.
- // Do not need to reconnect to the sentry server.
- if (targetException instanceof SentryUserException) {
- throw (SentryUserException) targetException;
- } else {
- throw (SentryHdfsServiceException) targetException;
- }
- }
- } else {
+ if (!((targetException instanceof SentryUserException) ||
+ (targetException instanceof SentryHdfsServiceException))) {
throw e;
}
+ Throwable sentryTargetException = targetException.getCause();
+ // If there has connection problem, eg, invalid connection if the service restarted,
+ // sentryTargetException instanceof TTransportException will be true.
+ if (sentryTargetException instanceof TTransportException) {
+ // Retry when the exception is caused by connection problem.
+ lastExc = new TTransportException(sentryTargetException);
+ LOGGER.error("Thrift call failed", lastExc);
+ // The connection to the server is bad, inform the client of the problem
+ client.invalidate();
+ } else {
+ // Semantic exception which does not indicate the connection failure.
+ // Do not need to reconnect to the sentry server.
+ if (targetException instanceof SentryUserException) {
+ throw (SentryUserException) targetException;
+ } else {
+ throw (SentryHdfsServiceException) targetException;
+ }
+ }
}
-
- // Increase the retry num
- retryCount++;
}
// Throw the exception as reaching the max rpc retry num.
String error = String.format("Request failed, %d retries attempted ", maxRetryCount);
- LOGGER.error(error, lastExc);
throw new SentryUserException(error, lastExc);
}
+ /**
+ * Connect the client, retry multiple times
+ * @throws Exception
+ */
+ private void connect() throws Exception {
+ Exception lastExc = null;
+ for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
+ try {
+ client.connect();
+ return;
+ } catch (Exception e) {
+ // Increase the retry num
+ // Retry when the exception is caused by connection problem.
+ LOGGER.error("failed to connect", e);
+ retryCount++;
+ lastExc = e;
+ }
+ }
+ assert lastExc != null;
+ throw lastExc;
+ }
+
@Override
public synchronized void close() {
- try {
- LOGGER.debug("Releasing the current client connection");
- client.disconnect();
- } catch (Exception e) {
- LOGGER.error("Encountered failure while closing the connection");
- }
+ //We are done with this client
+ client.done();
}
}
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
index 9ea7185..ad45305 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,6 +17,7 @@
*/
package org.apache.sentry.core.common.transport;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.MissingConfigurationException;
@@ -29,14 +30,6 @@
* the transport configuration.
*/
interface SentryClientTransportConfigInterface {
- /**
- * @param conf configuration
- * @return number of times client retry logic should iterate through all
- * the servers before giving up.
- * @throws MissingConfigurationException if property is mandatory and is missing in
- * configuration.
- */
- int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException;
/**
* @param conf configuration
@@ -52,7 +45,7 @@
* @throws MissingConfigurationException if property is mandatory and is missing in
* configuration.
*/
- boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException;
+ boolean isKerberosEnabled(Configuration conf);
/**
* @param conf configuration
@@ -61,7 +54,7 @@
* @throws MissingConfigurationException if property is mandatory and is missing in
* configuration.
*/
- boolean useUserGroupInformation(Configuration conf) throws MissingConfigurationException;
+ boolean useUserGroupInformation(Configuration conf);
/**
* @param conf configuration
@@ -69,7 +62,7 @@
* @throws MissingConfigurationException if property is mandatory and is missing in
* configuration.
*/
- String getSentryPrincipal(Configuration conf) throws MissingConfigurationException;
+ String getSentryPrincipal(Configuration conf);
/**
* Port in RPC Addresses configured is optional
@@ -78,7 +71,7 @@
* @throws MissingConfigurationException if property is mandatory and is missing in
* configuration.
*/
- String getSentryServerRpcAddress(Configuration conf) throws MissingConfigurationException;
+ String getSentryServerRpcAddress(Configuration conf);
/**
* Port in RPC Addresses configured is optional. If a port is not provided for a server
@@ -88,7 +81,7 @@
* @throws MissingConfigurationException if property is mandatory and is missing in
* configuration.
*/
- int getServerRpcPort(Configuration conf) throws MissingConfigurationException;
+ int getServerRpcPort(Configuration conf);
/**
* @param conf configuration
@@ -99,14 +92,59 @@
* @throws MissingConfigurationException if property is mandatory and is missing in
* configuration.
*/
- int getServerRpcConnTimeoutInMs(Configuration conf) throws MissingConfigurationException;
+ int getServerRpcConnTimeoutInMs(Configuration conf);
/**
- *
+ * Maximum number of connections in the pool.
+ * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMaxTotal(int)}
* @param conf configuration
- * @return True if the client should load balance connections between multiple servers
- * @throws MissingConfigurationException if property is mandatory and is missing in
- * configuration.
+ * @return maximum number of connection objects in the pool
*/
- boolean isLoadBalancingEnabled(Configuration conf)throws MissingConfigurationException;
+ int getPoolMaxTotal(Configuration conf);
+
+ /**
+ * Minimum number of idle obects on the pool.
+ * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMinIdle(int)}
+ * @param conf Configuration
+ * @return Minimum idle connections to keep in the pool
+ */
+ int getPoolMinIdle(Configuration conf);
+
+ /**
+ * Maximum number of idle connections in the pool.
+ * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMaxIdle(int)}
+ * @param conf Configuration
+ * @return Maximum number of idle connections in the pool
+ */
+ int getPoolMaxIdle(Configuration conf);
+
+ /**
+ * This is the minimum amount of time an object may sit idle in the pool
+ * before it is eligible for eviction.
+ * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMinEvictableIdleTimeMillis}
+ * @param conf Configuration
+ * @return The value for the pool minimum eviction time.
+ */
+ long getMinEvictableTimeSec(Configuration conf);
+
+ /**
+ * The number of seconds to sleep between runs of the idle object evictor thread.
+ * When non-positive, no idle object evictor thread will be run.
+ * See {@link GenericObjectPoolConfig#getTimeBetweenEvictionRunsMillis()}
+ * @param conf Configuration
+ * @return The number of seconds to sleep between runs of the idle object evictor thread.
+ */
+ long getTimeBetweenEvictionRunsSec(Configuration conf);
+
+ /**
+ * @param conf configuration
+ * @return True if using load-balancing between Sentry servers
+ */
+ boolean isLoadBalancingEnabled(Configuration conf);
+
+ /**
+ * @param conf configuration
+ * @return true if transport pools are enabled
+ */
+ boolean isTransportPoolEnabled(Configuration conf);
}
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
index 651173e..4caa6b6 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
@@ -19,6 +19,8 @@
package org.apache.sentry.core.common.transport;
+import java.util.concurrent.TimeUnit;
+
/**
* Defines configuration strings needed for sentry thrift clients to handle the transport level
* operations.
@@ -27,7 +29,7 @@
* Clients that needs these configuration string use the implementations of interface
* <code>SentryClientTransportConfigInterface</code>.
*/
-class SentryClientTransportConstants {
+public final class SentryClientTransportConstants {
/**
* max retry num for client rpc
@@ -44,8 +46,18 @@
"sentry.service.client.connection.full.retry-total";
static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = 2;
+ /**
+ * Enable load balancing between servers
+ */
+ static final String SENTRY_CLIENT_LOAD_BALANCING =
+ "sentry.service.client.connection.loadbalance";
+ static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true;
+
static final int RPC_PORT_DEFAULT = 8038;
+ private SentryClientTransportConstants() {
+ }
+
/**
* Defines configuration strings needed for sentry thrift policy clients to handle
* the transport level operations.
@@ -57,7 +69,6 @@
//configuration for server address. It can be coma seperated list of server addresses.
static final String SERVER_RPC_ADDRESS = "sentry.service.client.server.rpc-address";
-
/**
* This configuration parameter is only meant to be used for testing purposes.
*/
@@ -72,7 +83,7 @@
static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT =
SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL_DEFAULT;
- static final String SECURITY_USE_UGI_TRANSPORT = "sentry.service.security.use.ugi";
+ public static final String SECURITY_USE_UGI_TRANSPORT = "sentry.service.security.use.ugi";
static final String PRINCIPAL = "sentry.service.server.principal";
//configration for the client connection timeout.
@@ -88,35 +99,39 @@
static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total";
static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3;
- // connection pool configuration
- static final String SENTRY_POOL_ENABLED = "sentry.service.client.connection.pool.enabled";
- static final boolean SENTRY_POOL_ENABLED_DEFAULT = false;
+ // commons-pool configuration
+ static final String SENTRY_POOL_ENABLE = "sentry.service.client.connection.pool.enabled";
+ static final boolean SENTRY_POOL_ENABLE_DEFAULT = true;
- // commons-pool configuration for pool size
+ /** Allow unlimited number of idle connections */
static final String SENTRY_POOL_MAX_TOTAL = "sentry.service.client.connection.pool.max-total";
- static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = 8;
+ static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = -1;
static final String SENTRY_POOL_MAX_IDLE = "sentry.service.client.connection.pool.max-idle";
- static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 8;
+ static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 256;
static final String SENTRY_POOL_MIN_IDLE = "sentry.service.client.connection.pool.min-idle";
- static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 0;
+ static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 16;
+ static final String SENTRY_POOL_MIN_EVICTION_TIME_SEC =
+ "sentry.service.client.connection.pool.eviction.mintime.sec";
+ // 2 minutes seconds min time before eviction
+ static final long SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT =
+ TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);;
+ static final String SENTRY_POOL_EVICTION_INTERVAL_SEC =
+ "sentry.service.client.connection.pool.eviction.interval.sec";
+ // Run eviction thread every minute
+ static final long SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT =
+ TimeUnit.MILLISECONDS.convert(1L, TimeUnit.MINUTES);
- // configuration to load balance the connections to the configured sentry servers
- static final String SENTRY_CLIENT_LOAD_BALANCING = "sentry.service.client.connection.loadbalance";
- static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true;
-
- // retry num for getting the connection from connection pool
- static final String SENTRY_POOL_RETRY_TOTAL =
- SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL;
- static final int SENTRY_POOL_RETRY_TOTAL_DEFAULT =
- SentryClientTransportConstants.SENTRY_RPC_RETRY_TOTAL_DEFAULT;
-
+ static final String SENTRY_CLIENT_LOAD_BALANCING =
+ SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING;
+ static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT =
+ SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING_DEFAULT;
}
/**
* Defines configuration strings needed for sentry HDFS clients to handle the transport level
* operations.
*/
- static class HDFSClientConstants {
+ public static class HDFSClientConstants {
//Default server port
static final int SERVER_RPC_PORT_DEFAULT = SentryClientTransportConstants.RPC_PORT_DEFAULT;
@@ -142,14 +157,10 @@
static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT =
SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL_DEFAULT;
- static final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi";
+ public static final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi";
static final String PRINCIPAL = "sentry.hdfs.service.server.principal";
- static final String RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address";
-
- static final String RPC_ADDRESS_DEFAULT = "0.0.0.0"; //NOPMD
-
//configration for the client connection timeout.
static final String SERVER_RPC_CONN_TIMEOUT =
"sentry.hdfs.service.client.server.rpc-connection-timeout";
@@ -165,8 +176,28 @@
static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3;
- // configuration to load balance the connections to the configured sentry servers
- static final String SENTRY_CLIENT_LOAD_BALANCING = "sentry.hdfs.service.client.connection.loadbalance";
- static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true;
+ // commons-pool configuration - disable pool for HDFS clients
+ static final String SENTRY_POOL_ENABLE = "sentry.hdfs.service.client.connection.pool.enable";
+ static final boolean SENTRY_POOL_ENABLE_DEFAULT = false;
+
+ /** Total maximum number of open connections. There shouldn't be many. */
+ static final String SENTRY_POOL_MAX_TOTAL = "sentry.hdfs.service.client.connection.pool.max-total";
+ static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = 16;
+ /** Maximum number of idle connections to keep */
+ static final String SENTRY_POOL_MAX_IDLE = "sentry.hdfs.service.client.connection.pool.max-idle";
+ static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 2;
+ static final String SENTRY_POOL_MIN_IDLE = "sentry.hdfs.service.client.connection.pool.min-idle";
+ static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 1;
+ static final String SENTRY_POOL_MIN_EVICTION_TIME_SEC =
+ "sentry.hdfs.service.client.connection.pool.eviction.mintime.sec";
+ // No evictions for HDFS connections by default
+ static final long SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT = 0L;
+ static final String SENTRY_POOL_EVICTION_INTERVAL_SEC =
+ "sentry.hdfs.service.client.connection.pool.eviction.interval.sec";
+ static final long SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT = -1L;
+ static final String SENTRY_CLIENT_LOAD_BALANCING =
+ SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING;
+ static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT =
+ SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING_DEFAULT;
}
}
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java
new file mode 100644
index 0000000..b5f2bcf
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.transport;
+
+/**
+ * Representation of a connection to a Sentry Server.
+ * <ul>
+ * <li>Connection is initialized using the {@link #connect()} method.</li>
+ * <li>When the connection is no longer used, the {@link #done()} method should be called to
+ * deallocate any resources.</li>
+ * <li>If the user detected that connection is broken, they should call
+ * {@link #invalidate()} method. The connection can not be used after that.</li>
+ * </ul>
+ */
+public interface SentryConnection {
+ /**
+ * Connect to Sentry server.
+ * Either creates a new connection or reuses an existing one.
+ * @throws Exception on failure to connect.
+ */
+ void connect() throws Exception;
+
+ /**
+ * Disconnect from the server. May close connection or return it to a
+ * pool for reuse.
+ */
+ void done();
+
+ /**
+ * The connection is assumed to be non-working, invalidate it.
+ * Subsequent {@link #connect() call} should attempt to obtain
+ * another connection.
+ * <p>
+ * The implementation may attempt to connect
+ * to another server immediately or delay it till the call to
+ * {@link #connect()}.
+ */
+ void invalidate();
+}
\ No newline at end of file
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
index 2d80827..1724e7f 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
@@ -32,63 +32,92 @@
*/
public final class SentryHDFSClientTransportConfig
implements SentryClientTransportConfigInterface {
- public SentryHDFSClientTransportConfig() { }
@Override
- public boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException {
+ public boolean isKerberosEnabled(Configuration conf) {
return (conf.get(SECURITY_MODE, SentryConstants.KERBEROS_MODE).trim()
.equalsIgnoreCase((SentryConstants.KERBEROS_MODE)));
}
@Override
- public int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException {
- return conf.getInt(SENTRY_FULL_RETRY_TOTAL, SENTRY_FULL_RETRY_TOTAL_DEFAULT);
- }
-
- @Override
public int getSentryRpcRetryTotal(Configuration conf) {
return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT);
}
@Override
- public boolean useUserGroupInformation(Configuration conf)
- throws MissingConfigurationException {
- return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
+ public boolean useUserGroupInformation(Configuration conf) {
+ return Boolean.parseBoolean(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
}
+ /**
+ * @throws MissingConfigurationException
+ */
@Override
- public String getSentryPrincipal(Configuration conf) throws MissingConfigurationException {
+ public String getSentryPrincipal(Configuration conf) {
String principle = conf.get(PRINCIPAL);
- if (principle != null && !principle.isEmpty()) {
+ if ((principle != null) && !principle.isEmpty()) {
return principle;
}
throw new MissingConfigurationException(PRINCIPAL);
}
+ /**
+ * @throws MissingConfigurationException
+ */
@Override
- public String getSentryServerRpcAddress(Configuration conf)
- throws MissingConfigurationException {
+ public String getSentryServerRpcAddress(Configuration conf) {
String serverAddress = conf.get(SERVER_RPC_ADDRESS);
- if (serverAddress != null && !serverAddress.isEmpty()) {
+ if ((serverAddress != null) && !serverAddress.isEmpty()) {
return serverAddress;
}
throw new MissingConfigurationException(SERVER_RPC_ADDRESS);
}
@Override
- public int getServerRpcPort(Configuration conf) throws MissingConfigurationException {
+ public int getServerRpcPort(Configuration conf) {
return conf.getInt(SERVER_RPC_PORT, SentryClientTransportConstants.RPC_PORT_DEFAULT);
}
@Override
- public int getServerRpcConnTimeoutInMs(Configuration conf)
- throws MissingConfigurationException {
+ public int getServerRpcConnTimeoutInMs(Configuration conf) {
return conf.getInt(SERVER_RPC_CONN_TIMEOUT, SERVER_RPC_CONN_TIMEOUT_DEFAULT);
}
@Override
- public boolean isLoadBalancingEnabled(Configuration conf)
- throws MissingConfigurationException {
- return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING, SENTRY_CLIENT_LOAD_BALANCING_DEFAULT);
+ public int getPoolMaxTotal(Configuration conf) {
+ return conf.getInt(SENTRY_POOL_MAX_TOTAL, SENTRY_POOL_MAX_TOTAL_DEFAULT);
+ }
+
+ @Override
+ public int getPoolMinIdle(Configuration conf) {
+ return conf.getInt(SENTRY_POOL_MIN_IDLE, SENTRY_POOL_MIN_IDLE_DEFAULT);
+ }
+
+ @Override
+ public int getPoolMaxIdle(Configuration conf) {
+ return conf.getInt(SENTRY_POOL_MAX_IDLE, SENTRY_POOL_MAX_IDLE_DEFAULT);
+ }
+
+ @Override
+ public long getMinEvictableTimeSec(Configuration conf) {
+ return conf.getLong(SENTRY_POOL_MIN_EVICTION_TIME_SEC,
+ SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT);
+ }
+
+ @Override
+ public long getTimeBetweenEvictionRunsSec(Configuration conf) {
+ return conf.getLong(SENTRY_POOL_EVICTION_INTERVAL_SEC,
+ SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT);
+ }
+
+ @Override
+ public boolean isLoadBalancingEnabled(Configuration conf) {
+ return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING,
+ SENTRY_CLIENT_LOAD_BALANCING_DEFAULT);
+ }
+
+ @Override
+ public boolean isTransportPoolEnabled(Configuration conf) {
+ return conf.getBoolean(SENTRY_POOL_ENABLE, SENTRY_POOL_ENABLE_DEFAULT);
}
}
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
index c97fe97..45522df 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.MissingConfigurationException;
import org.apache.sentry.core.common.utils.SentryConstants;
+
import static org.apache.sentry.core.common.transport.SentryClientTransportConstants.PolicyClientConstants.*;
/**
@@ -32,63 +33,91 @@
*/
public final class SentryPolicyClientTransportConfig
implements SentryClientTransportConfigInterface {
- public SentryPolicyClientTransportConfig() { }
@Override
- public boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException {
+ public boolean isKerberosEnabled(Configuration conf) {
return (conf.get(SECURITY_MODE, SentryConstants.KERBEROS_MODE).trim()
.equalsIgnoreCase((SentryConstants.KERBEROS_MODE)));
}
@Override
- public int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException {
- return conf.getInt(SENTRY_FULL_RETRY_TOTAL, SENTRY_FULL_RETRY_TOTAL_DEFAULT);
- }
-
- @Override
public int getSentryRpcRetryTotal(Configuration conf) {
return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT);
}
@Override
- public boolean useUserGroupInformation(Configuration conf)
- throws MissingConfigurationException {
+ public boolean useUserGroupInformation(Configuration conf) {
return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
}
+ /**
+ * @throws MissingConfigurationException
+ */
@Override
- public String getSentryPrincipal(Configuration conf) throws MissingConfigurationException {
+ public String getSentryPrincipal(Configuration conf) {
String principle = conf.get(PRINCIPAL);
- if (principle != null && !principle.isEmpty()) {
+ if ((principle != null) && !principle.isEmpty()) {
return principle;
}
throw new MissingConfigurationException(PRINCIPAL);
}
+ /**
+ * @throws MissingConfigurationException
+ */
@Override
- public String getSentryServerRpcAddress(Configuration conf)
- throws MissingConfigurationException {
+ public String getSentryServerRpcAddress(Configuration conf) {
String serverAddress = conf.get(SERVER_RPC_ADDRESS);
- if (serverAddress != null && !serverAddress.isEmpty()) {
+ if ((serverAddress != null) && !serverAddress.isEmpty()) {
return serverAddress;
}
throw new MissingConfigurationException(SERVER_RPC_ADDRESS);
}
@Override
- public int getServerRpcPort(Configuration conf) throws MissingConfigurationException {
+ public int getServerRpcPort(Configuration conf) {
return conf.getInt(SERVER_RPC_PORT, SentryClientTransportConstants.RPC_PORT_DEFAULT);
}
@Override
- public int getServerRpcConnTimeoutInMs(Configuration conf)
- throws MissingConfigurationException {
+ public int getServerRpcConnTimeoutInMs(Configuration conf) {
return conf.getInt(SERVER_RPC_CONN_TIMEOUT, SERVER_RPC_CONN_TIMEOUT_DEFAULT);
}
@Override
- public boolean isLoadBalancingEnabled(Configuration conf)
- throws MissingConfigurationException {
+ public int getPoolMaxTotal(Configuration conf) {
+ return conf.getInt(SENTRY_POOL_MAX_TOTAL, SENTRY_POOL_MAX_TOTAL_DEFAULT);
+ }
+
+ @Override
+ public int getPoolMinIdle(Configuration conf) {
+ return conf.getInt(SENTRY_POOL_MIN_IDLE, SENTRY_POOL_MIN_IDLE_DEFAULT);
+ }
+
+ @Override
+ public int getPoolMaxIdle(Configuration conf) {
+ return conf.getInt(SENTRY_POOL_MAX_IDLE, SENTRY_POOL_MAX_IDLE_DEFAULT);
+ }
+
+ @Override
+ public long getMinEvictableTimeSec(Configuration conf) {
+ return conf.getLong(SENTRY_POOL_MIN_EVICTION_TIME_SEC,
+ SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT);
+ }
+
+ @Override
+ public long getTimeBetweenEvictionRunsSec(Configuration conf) {
+ return conf.getLong(SENTRY_POOL_EVICTION_INTERVAL_SEC,
+ SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT);
+ }
+
+ @Override
+ public boolean isLoadBalancingEnabled(Configuration conf) {
return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING, SENTRY_CLIENT_LOAD_BALANCING_DEFAULT);
}
+
+ @Override
+ public boolean isTransportPoolEnabled(Configuration conf) {
+ return conf.getBoolean(SENTRY_POOL_ENABLE, SENTRY_POOL_ENABLE_DEFAULT);
+ }
}
\ No newline at end of file
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
deleted file mode 100644
index 9a10ca5..0000000
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.core.common.transport;
-
-/**
- * Client interface for Proxy Invocation handlers
- * <p>
- * Defines interface that Sentry client's should expose to the Invocation handlers like
- * <code>RetryClientInvocationHandler</code> used to proxy the method invocation on sentry
- * client instances .
- * <p>
- * All the sentry clients that need retrying and failover capabilities should implement
- * this interface.
- */
-public interface SentryServiceClient {
- /**
- * Connect to Sentry server.
- * Either creates a new connection or reuses an existing one.
- * @throws Exception on failure to acquire a transport towards server.
- */
- void connect() throws Exception;
-
- /**
- * Disconnect from the server. May close connection or return it to a
- * pool for reuse.
- */
- void disconnect();
-}
\ No newline at end of file
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
index 74aced2..b41bdfd 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,57 +22,136 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.core.common.exception.MissingConfigurationException;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
-import org.apache.sentry.core.common.utils.ThriftUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.ThreadSafe;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
/**
- * Create Thrift transports suitable for talking to Sentry
+ * Factory for producing connected Thrift transports.
+ * It can produce regular transports as well as Kerberos-enabled transports.
+ * <p>
+ * This class is immutable and thus thread-safe.
*/
-
-public class SentryTransportFactory {
- protected final Configuration conf;
- private String[] serverPrincipalParts;
- protected TTransport thriftTransport;
- private final int connectionTimeout;
+@ThreadSafe
+public final class SentryTransportFactory implements TransportFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class);
- // configs for connection retry
- private final int connectionFullRetryTotal;
- private final ArrayList<InetSocketAddress> endpoints;
- private final SentryClientTransportConfigInterface transportConfig;
+
+ private final Configuration conf;
+ private final boolean useUgi;
+ private final String serverPrincipal;
+ private final int connectionTimeout;
+ private final boolean isKerberosEnabled;
private static final ImmutableMap<String, String> SASL_PROPERTIES =
ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
/**
+ * Initialize the object based on the sentry configuration provided.
+ *
+ * @param conf Sentry configuration
+ * @param transportConfig transport configuration to use
+ */
+ public SentryTransportFactory(Configuration conf,
+ SentryClientTransportConfigInterface transportConfig) {
+
+ this.conf = conf;
+ Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
+ connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
+ isKerberosEnabled = transportConfig.isKerberosEnabled(conf);
+ if (isKerberosEnabled) {
+ useUgi = transportConfig.useUserGroupInformation(conf);
+ serverPrincipal = transportConfig.getSentryPrincipal(conf);
+ } else {
+ serverPrincipal = null;
+ useUgi = false;
+ }
+ }
+
+ /**
+ * Connect to the endpoint and return a connected Thrift transport.
+ * @return Connection to the endpoint
+ * @throws IOException
+ */
+ @Override
+ public TTransportWrapper getTransport(HostAndPort endpoint) throws IOException {
+ return new TTransportWrapper(connectToServer(new InetSocketAddress(endpoint.getHostText(),
+ endpoint.getPort())),
+ endpoint);
+ }
+
+ /**
+ * Connect to the specified socket address and throw IOException if failed.
+ *
+ * @param serverAddress Address client needs to connect
+ * @throws Exception if there is failure in establishing the connection.
+ */
+ private TTransport connectToServer(InetSocketAddress serverAddress) throws IOException {
+ try {
+ TTransport thriftTransport = createTransport(serverAddress);
+ thriftTransport.open();
+ return thriftTransport;
+ } catch (TTransportException e) {
+ throw new IOException("Failed to open transport: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Create transport given InetSocketAddress
+ * @param serverAddress - endpoint address
+ * @return unconnected transport
+ * @throws TTransportException
+ * @throws IOException
+ */
+ private TTransport createTransport(InetSocketAddress serverAddress)
+ throws IOException {
+ String hostName = serverAddress.getHostName();
+ int port = serverAddress.getPort();
+ TTransport socket = new TSocket(hostName, port, connectionTimeout);
+
+ if (!isKerberosEnabled) {
+ LOGGER.debug("created unprotected connection to {}:{} ", hostName, port);
+ return socket;
+ }
+
+ String principal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+ String[] serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
+ if (serverPrincipalParts.length != 3) {
+ throw new IOException("Kerberos principal should have 3 parts: " + principal);
+ }
+
+ UgiSaslClientTransport connection =
+ new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
+ serverPrincipalParts[0], serverPrincipalParts[1],
+ socket, useUgi);
+
+ LOGGER.debug("created secured connection to {}:{} ", hostName, port);
+ return connection;
+ }
+
+ /**
* This transport wraps the Sasl transports to set up the right UGI context for open().
*/
- public static class UgiSaslClientTransport extends TSaslClientTransport {
- UserGroupInformation ugi = null;
+ private static class UgiSaslClientTransport extends TSaslClientTransport {
+ private UserGroupInformation ugi = null;
- public UgiSaslClientTransport(String mechanism, String protocol,
- String serverName, TTransport transport,
- boolean wrapUgi, Configuration conf)
- throws IOException, SaslException {
+ UgiSaslClientTransport(String mechanism, String protocol,
+ String serverName, TTransport transport,
+ boolean wrapUgi)
+ throws IOException, SaslException {
super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
- transport);
+ transport);
if (wrapUgi) {
ugi = UserGroupInformation.getLoginUser();
}
@@ -98,8 +177,9 @@
} catch (IOException e) {
throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new TTransportException(
- "Interrupted while opening underlying transport: " + e.getMessage(), e);
+ "Interrupted while opening underlying transport: " + e.getMessage(), e);
}
}
}
@@ -108,203 +188,4 @@
super.open();
}
}
-
- /**
- * Initialize the object based on the sentry configuration provided.
- * List of configured servers are reordered randomly preventing all
- * clients connecting to the same server.
- *
- * @param conf Sentry configuration
- * @param transportConfig transport configuration to use
- */
- public SentryTransportFactory(Configuration conf,
- SentryClientTransportConfigInterface transportConfig) throws IOException {
-
- this.conf = conf;
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- serverPrincipalParts = null;
- this.transportConfig = transportConfig;
-
- try {
- this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
- this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
- if(transportConfig.isKerberosEnabled(conf) &&
- transportConfig.useUserGroupInformation(conf)) {
- // Re-initializing UserGroupInformation, if needed
- UserGroupInformationInitializer.initialize(conf);
- }
- String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
-
- int serverPort = transportConfig.getServerRpcPort(conf);
-
- String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
- HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
-
- this.endpoints = new ArrayList<>(hostsAndPortsStrArr.length);
- for (HostAndPort endpoint : hostsAndPorts) {
- this.endpoints.add(
- new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
- LOGGER.debug("Added server endpoint: " + endpoint.toString());
- }
-
- if((endpoints.size() > 1) && (transportConfig.isLoadBalancingEnabled(conf))) {
- // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
- // and load balance the connections towards sentry servers
- Collections.shuffle(endpoints);
- }
- } catch (MissingConfigurationException e) {
- throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e);
- }
- }
-
- /**
- * Initialize object based on the parameters provided provided.
- *
- * @param addr Host address which the client needs to connect
- * @param port Host Port which the client needs to connect
- * @param conf Sentry configuration
- * @param transportConfig transport configuration to use
- */
- public SentryTransportFactory(String addr, int port, Configuration conf,
- SentryClientTransportConfigInterface transportConfig) throws IOException {
- // copy the configuration because we may make modifications to it.
- this.conf = new Configuration(conf);
- serverPrincipalParts = null;
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- this.transportConfig = transportConfig;
-
- try {
- this.endpoints = new ArrayList<>(1);
- this.endpoints.add(NetUtils.createSocketAddr(addr, port));
- this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
- this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
- } catch (MissingConfigurationException e) {
- throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e);
- }
- }
-
-
- /**
- * On connection error, Iterates through all the configured servers and tries to connect.
- * On successful connection, control returns
- * On connection failure, continues iterating through all the configured sentry servers,
- * and then retries the whole server list no more than connectionFullRetryTotal times.
- * In this case, it won't introduce more latency when some server fails.
- * <p>
- * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
- */
- public TTransport getTransport() throws IOException {
- IOException currentException = null;
- for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
- try {
- return connectToAvailableServer();
- } catch (IOException e) {
- currentException = e;
- LOGGER.error(
- "Failed to connect to all the configured sentry servers, Retrying again");
- }
- }
- // Throws exception on reaching the connectionFullRetryTotal.
- LOGGER.error(
- String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
- currentException);
- throw currentException;
- }
-
- /**
- * Iterates through all the configured servers and tries to connect.
- * On connection error, tries to connect to next server.
- * Control returns on successful connection OR it's done trying to all the
- * configured servers.
- *
- * @throws IOException
- */
- private TTransport connectToAvailableServer() throws IOException {
- IOException currentException = null;
- for (InetSocketAddress addr : endpoints) {
- try {
- return connectToServer(addr);
- } catch (IOException e) {
- LOGGER.error(String.format("Failed connection to %s: %s",
- addr.toString(), e.getMessage()), e);
- currentException = e;
- }
- }
- throw currentException;
- }
-
- /**
- * Connect to the specified socket address and throw IOException if failed.
- *
- * @param serverAddress Address client needs to connect
- * @throws Exception if there is failure in establishing the connection.
- */
- private TTransport connectToServer(InetSocketAddress serverAddress) throws IOException {
- try {
- thriftTransport = createTransport(serverAddress);
- thriftTransport.open();
- } catch (TTransportException e) {
- throw new IOException("Failed to open transport: " + e.getMessage(), e);
- } catch (MissingConfigurationException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
-
- LOGGER.debug("Successfully opened transport: " + thriftTransport + " to " + serverAddress);
- return thriftTransport;
- }
-
- /**
- * New socket is is created
- *
- * @param serverAddress
- * @return
- * @throws TTransportException
- * @throws MissingConfigurationException
- * @throws IOException
- */
- private TTransport createTransport(InetSocketAddress serverAddress)
- throws TTransportException, MissingConfigurationException, IOException {
- TTransport socket = new TSocket(serverAddress.getHostName(),
- serverAddress.getPort(), connectionTimeout);
-
- if (!transportConfig.isKerberosEnabled(conf)) {
- return socket;
- } else {
- String serverPrincipal = transportConfig.getSentryPrincipal(conf);
- serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
- LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
- if (serverPrincipalParts == null) {
- serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
- Preconditions.checkArgument(serverPrincipalParts.length == 3,
- "Kerberos principal should have 3 parts: " + serverPrincipal);
- }
-
- boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
- return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
- serverPrincipalParts[0], serverPrincipalParts[1],
- socket, wrapUgi, conf);
- }
- }
-
- private boolean isConnected() {
- return thriftTransport != null && thriftTransport.isOpen();
- }
-
- /**
- * Method currently closes the transport
- * TODO (Kalyan) Plan is to hold the transport and resuse it accross multiple client's
- * That way, new connection need not be created for each new client
- */
- public void releaseTransport() {
- close();
- }
-
- /**
- * Method closes the transport
- */
- public void close() {
- if (isConnected()) {
- thriftTransport.close();
- }
- }
}
\ No newline at end of file
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java
new file mode 100644
index 0000000..e33dd2b
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.base.Preconditions;
+import com.google.common.net.HostAndPort;
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.utils.ThriftUtil;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Pool of transport connections to Sentry server.
+ * The pool caches open connections to multiple Sentry servers,
+ * specified in the configuration.
+ *
+ * When transport pooling is disabled in configuration,
+ * creates transports directly.
+ */
+@ThreadSafe
+public final class SentryTransportPool implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportPool.class);
+
+ // Used for logging to identify pool instances. This is only useful for test debugging
+ // so we do not preserve thread safety for this field.
+ private static int poolId = 0;
+ private final int id;
+
+ // True if using Object pool
+ private final boolean isPoolEnabled;
+
+ // Load balance between servers if true
+ private final boolean doLoadBalancing;
+
+ // List of all known servers
+ private final ArrayList<HostAndPort> endpoints;
+
+ // Transport pool which keeps connected transports
+ private final KeyedObjectPool<HostAndPort, TTransportWrapper> pool;
+ // Source of connected transports
+ private final TransportFactory transportFactory;
+
+ // Set when we are closed
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ /**
+ * Configure transport pool.
+ * <p>
+ * The pool accepts the following configuration:
+ * <ul>
+ * <li>Maximum total number of objects in the pool</li>
+ * <li>Minimum number of idle objects</li>
+ * <li>Maximum number of idle objects</li>
+ * <li>Minimum time before the object is evicted</li>
+ * <li>Interval between evictions</li>
+ * </ul>
+ * @param conf Configuration
+ * @param transportConfig Configuration interface
+ * @param transportFactory Transport factory used to produce transports
+ */
+ public SentryTransportPool(Configuration conf,
+ SentryClientTransportConfigInterface transportConfig,
+ TransportFactory transportFactory) {
+
+ // This isn't thread-safe, but we don't care - it is only used
+ // for debugging when running tests - normal apps use a single pool
+ poolId++;
+ id = poolId;
+
+ this.transportFactory = transportFactory;
+ doLoadBalancing = transportConfig.isLoadBalancingEnabled(conf);
+ isPoolEnabled = transportConfig.isTransportPoolEnabled(conf);
+
+ // Get list of server addresses
+ String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
+ int serverPort = transportConfig.getServerRpcPort(conf);
+ LOGGER.info("Creating pool for {} with default port {}",
+ hostsAndPortsStr, serverPort);
+ String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+ Preconditions.checkArgument(hostsAndPortsStrArr.length > 0,
+ "At least one server should be specified");
+
+ endpoints = new ArrayList<>(hostsAndPortsStrArr.length);
+ for(String addr: hostsAndPortsStrArr) {
+ HostAndPort endpoint = ThriftUtil.parseAddress(addr, serverPort);
+ LOGGER.info("Adding endpoint {}", endpoint);
+ endpoints.add(endpoint);
+ }
+
+ // this.transportFactory = new SentryTransportFactory(conf, transportConfig);
+
+ if (!isPoolEnabled) {
+ pool = null;
+ LOGGER.info("Connection pooling is disabled");
+ return;
+ }
+
+ LOGGER.info("Connection pooling is enabled");
+ // Set pool configuration based on Configuration settings
+ GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
+
+ // Don't limit maximum number of objects in the pool
+ poolConfig.setMaxTotal(-1);
+
+ poolConfig.setMinIdlePerKey(transportConfig.getPoolMinIdle(conf));
+ poolConfig.setMaxIdlePerKey(transportConfig.getPoolMaxIdle(conf));
+
+ // Do not block when pool is exhausted, throw exception instead
+ poolConfig.setBlockWhenExhausted(false);
+ poolConfig.setTestOnReturn(true);
+
+ // No limit for total objects in the pool
+ poolConfig.setMaxTotalPerKey(transportConfig.getPoolMaxTotal(conf));
+ poolConfig.setMinEvictableIdleTimeMillis(transportConfig.getMinEvictableTimeSec(conf));
+ poolConfig.setTimeBetweenEvictionRunsMillis(transportConfig.getTimeBetweenEvictionRunsSec(conf));
+
+ // Create object pool
+ pool = new GenericKeyedObjectPool<>(new PoolFactory(this.transportFactory, id),
+ poolConfig);
+ }
+
+ public TTransportWrapper getTransport() throws Exception {
+ List<HostAndPort> servers;
+ // If we are doing load balancing and there is more then one server,
+ // shuffle them before obtaining connection
+ if (doLoadBalancing && (endpoints.size() > 1)) {
+ servers = new ArrayList<>(endpoints);
+ Collections.shuffle(servers);
+ } else {
+ servers = endpoints;
+ }
+
+ // Try to get a connection from one of the pools
+ Exception failure = null;
+ for(HostAndPort addr: servers) {
+ try {
+ TTransportWrapper transport =
+ isPoolEnabled ?
+ pool.borrowObject(addr) :
+ transportFactory.getTransport(addr);
+ LOGGER.debug("[{}] obtained transport {}", id, transport);
+ return transport;
+ } catch (IllegalStateException e) {
+ // Should not happen
+ LOGGER.error("Unexpected error from pool {}", id, e);
+ failure = e;
+ } catch (Exception e) {
+ LOGGER.error("Failed to obtain transport for {}: {}",
+ addr, e.getMessage());
+ failure = e;
+ }
+ }
+ // Failed to borrow connect to any endpoint
+ assert failure != null;
+ throw failure;
+ }
+
+ /**
+ * Return transport to the pool
+ * @param transport Open transport
+ */
+ public void returnTransport(TTransportWrapper transport) {
+ if (closed.get()) {
+ LOGGER.debug("Returned {} to closed pool", transport);
+ transport.close();
+ return;
+ }
+ try {
+ if (isPoolEnabled) {
+ LOGGER.debug("[{}] returning {}", id, transport);
+ pool.returnObject(transport.getAddress(), transport);
+ } else {
+ LOGGER.debug("Closing {}", transport);
+ transport.close();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to return {}", transport, e);
+ }
+ }
+
+ public void invalidateTransport(TTransportWrapper transport) {
+ if (closed.get()) {
+ LOGGER.debug("invalidated {} for closed pool", transport);
+ transport.close();
+ return;
+ }
+ try {
+ LOGGER.debug("[{}] Invalidating address {}", id, transport);
+ if (!isPoolEnabled) {
+ transport.close();
+ } else {
+ pool.invalidateObject(transport.getAddress(), transport);
+ // Invalidate the whole pool associated with the given address
+ // It is a bit brutal since a single bad connection may
+ // cause an invalidation, but otherwise we may have a lot of bad
+ // connections in the pool and try to return them.
+ pool.clear(transport.getAddress());
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to invalidate {}", transport, e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (closed.get()) {
+ // already closed
+ return;
+ }
+ LOGGER.debug("[{}] closing", id);
+ if (pool != null) {
+ LOGGER.debug("Closing pool of {}/{} endpoints",
+ pool.getNumIdle(), pool.getNumActive());
+ pool.close();
+ }
+ }
+
+ /**
+ * Factory that creates and destroys pool objects
+ */
+ private static final class PoolFactory
+ extends BaseKeyedPooledObjectFactory<HostAndPort, TTransportWrapper> {
+ private final TransportFactory transportFactory;
+ private final int id;
+
+ /**
+ * Create a pool factory associated with the given transport factory
+ * @param transportFactory - factory producing transports
+ * @param id pool id (for debugging)
+ */
+ private PoolFactory(TransportFactory transportFactory, int id) {
+ this.transportFactory = transportFactory;
+ this.id = id;
+ }
+
+ @Override
+ public boolean validateObject(HostAndPort key, PooledObject<TTransportWrapper> p) {
+ TTransportWrapper transport = p.getObject();
+ if (transport == null) {
+ LOGGER.error("No transport to validate");
+ return false;
+ }
+ if (transport.getAddress() != key) {
+ LOGGER.error("Invalid endpoint {}: does not match {}", transport, key);
+ return false;
+ }
+ try {
+ transport.flush();
+ } catch (TTransportException e) {
+ LOGGER.error("Failed to verify connection to {}", key, e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public TTransportWrapper create(HostAndPort key) throws Exception {
+ TTransportWrapper transportWrapper = transportFactory.getTransport(key);
+ LOGGER.debug("[{}] created {}", id, transportWrapper);
+ return transportWrapper;
+ }
+
+ @Override
+ public void destroyObject(HostAndPort key, PooledObject<TTransportWrapper> p) throws Exception {
+ TTransportWrapper transport = p.getObject();
+ if (transport != null) {
+ LOGGER.debug("[{}] Destroying endpoint {}", id, transport);
+ try {
+ transport.close();
+ } catch (RuntimeException e) {
+ LOGGER.error("fail to destroy endpoint {}", transport, e);
+ }
+ }
+ }
+
+ @Override
+ public PooledObject<TTransportWrapper> wrap(TTransportWrapper value) {
+ return new DefaultPooledObject<>(value);
+ }
+ }
+
+}
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TTransportWrapper.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TTransportWrapper.java
new file mode 100644
index 0000000..07283fb
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TTransportWrapper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.net.HostAndPort;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Closeable;
+
+/**
+ * Extension of Thrift Transport which also provides the endpoint address.
+ * The address is represented as {@link HostAndPort} object.
+ */
+public final class TTransportWrapper implements Closeable {
+ private final TTransport transport;
+ private final HostAndPort address;
+
+ /**
+ * @param transport Thrift transport (may be in any state)
+ * @param address The address associated with this transport.
+ */
+ TTransportWrapper(TTransport transport, HostAndPort address) {
+ this.transport = transport;
+ this.address = address;
+ }
+
+ /**
+ * @return Thrift transport value
+ */
+ public TTransport getTTransport() {
+ return transport;
+ }
+
+ /**
+ * @return endpoint address for the transport
+ */
+ public HostAndPort getAddress() {
+ return address;
+ }
+
+ /**
+ * @return True if and only if the transport is open
+ */
+ public boolean isOpen() {
+ return transport.isOpen();
+ }
+
+ /**
+ * Flush the underlying transport
+ * @throws TTransportException
+ */
+ public void flush() throws TTransportException {
+ transport.flush();
+ }
+
+ /**
+ * @return human-readable representation of a transport.
+ * It includes the endpoint address and open/closed state.
+ */
+ @Override
+ public String toString() {
+ return address.toString();
+ }
+
+ /**
+ * Close the underlying transport
+ */
+ @Override
+ public void close() {
+ transport.close();
+ }
+}
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TransportFactory.java
new file mode 100644
index 0000000..e115cbb
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TransportFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.net.HostAndPort;
+
+import java.io.IOException;
+
+/**
+ * Generic transport factory interface.
+ * <p>
+ * The intention is to implement transport pool in more abstract terms
+ * and be able to test it without actually connecting to any servers by
+ * implementing mock transport factories.
+ */
+public interface TransportFactory {
+ /**
+ * Connect to the endpoint and return a connected Thrift transport.
+ * @return Connection to the endpoint
+ * @throws IOException
+ */
+ TTransportWrapper getTransport(HostAndPort endpoint) throws IOException;
+}
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
index d9fab86..0fffb51 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
@@ -116,12 +116,8 @@
* (host:port). The hostname could be in ipv6 style. If port is not specified,
* defaultPort will be used.
*/
- public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort) {
- HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length];
- for (int i = 0; i < hostsAndPorts.length; i++) {
- hostsAndPorts[i] =
- HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort);
- }
- return hostsAndPorts;
+ public static HostAndPort parseAddress(String address, int defaultPort) {
+ return HostAndPort.fromString(address).withDefaultPort(defaultPort);
}
+
}
\ No newline at end of file
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
index 34caa0e..7304fd8 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
@@ -21,7 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SentryUpdater {
+class SentryUpdater {
private SentryHDFSServiceClient sentryClient;
private final Configuration conf;
@@ -29,12 +29,12 @@
private static final Logger LOG = LoggerFactory.getLogger(SentryUpdater.class);
- public SentryUpdater(Configuration conf, SentryAuthorizationInfo authzInfo) throws Exception {
+ SentryUpdater(Configuration conf, SentryAuthorizationInfo authzInfo) throws Exception {
this.conf = conf;
this.authzInfo = authzInfo;
}
- public SentryAuthzUpdate getUpdates() {
+ SentryAuthzUpdate getUpdates() {
if (sentryClient == null) {
try {
sentryClient = SentryHDFSServiceClientFactory.create(conf);
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
index 11f6894..49d2360 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
@@ -17,19 +17,9 @@
*/
package org.apache.sentry.hdfs;
-import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-
-public interface SentryHDFSServiceClient {
+public interface SentryHDFSServiceClient extends AutoCloseable {
String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";
- void notifyHMSUpdate(PathsUpdate update)
- throws SentryHdfsServiceException;
-
- long getLastSeenHMSPathSeqNum() throws SentryHdfsServiceException;
-
- SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
- throws SentryHdfsServiceException;
-
- void close();
+ SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum);
}
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
index 798bbef..1cdbb85 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
@@ -17,15 +17,12 @@
*/
package org.apache.sentry.hdfs;
-import java.io.IOException;
-import java.util.LinkedList;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-import org.apache.sentry.core.common.transport.SentryTransportFactory;
-import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
+import org.apache.sentry.core.common.transport.SentryConnection;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.apache.sentry.core.common.transport.TTransportWrapper;
+import org.apache.sentry.hdfs.ServiceConstants.ClientConfig;
import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
@@ -35,82 +32,57 @@
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.LinkedList;
/**
* Sentry HDFS Service Client
* <p>
- * The public implementation of SentryHDFSServiceClient.
- * A Sentry Client in which all the operations are synchronized for thread safety
- * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state.
- * So it is important to close and re-open the transport so that new socket is used.
+ * The class isn't thread-safe - it is up to the aller to ensure thread safety
*/
-
-public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClient, SentryServiceClient {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class);
+public class SentryHDFSServiceClientDefaultImpl
+ implements SentryHDFSServiceClient, SentryConnection {
+ private final boolean useCompactTransport;
private Client client;
- private SentryTransportFactory transportFactory;
- private TTransport transport;
- private Configuration conf;
+ private final SentryTransportPool transportPool;
+ private TTransportWrapper transport;
+ private final long maxMessageSize;
- public SentryHDFSServiceClientDefaultImpl(Configuration conf, SentryHDFSClientTransportConfig transportConfig) throws IOException {
- transportFactory = new SentryTransportFactory(conf, transportConfig);
- this.conf = conf;
+ SentryHDFSServiceClientDefaultImpl(Configuration conf,
+ SentryTransportPool transportPool) throws IOException {
+ maxMessageSize = conf.getLong(ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
+ ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ useCompactTransport = conf.getBoolean(ClientConfig.USE_COMPACT_TRANSPORT,
+ ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT);
+ this.transportPool = transportPool;
}
/**
* Connect to the sentry server
*
- * @throws IOException
+ * @throws Exception
*/
@Override
- public synchronized void connect() throws IOException {
- if (transport != null && transport.isOpen()) {
+ public void connect() throws Exception {
+ if ((transport != null) && transport.isOpen()) {
return;
}
- transport = transportFactory.getTransport();
- TProtocol tProtocol = null;
- long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
- ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
- if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT,
- ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
- tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize);
+ transport = transportPool.getTransport();
+ TProtocol tProtocol;
+ if (useCompactTransport) {
+ tProtocol = new TCompactProtocol(transport.getTTransport(), maxMessageSize, maxMessageSize);
} else {
- tProtocol = new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true);
+ tProtocol = new TBinaryProtocol(transport.getTTransport(), maxMessageSize, maxMessageSize, true, true);
}
TMultiplexedProtocol protocol = new TMultiplexedProtocol(
tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME);
- client = new SentryHDFSService.Client(protocol);
- LOGGER.info("Successfully created client");
+ client = new Client(protocol);
}
@Override
- public synchronized void notifyHMSUpdate(PathsUpdate update)
- throws SentryHdfsServiceException {
- try {
- client.handle_hms_notification(update.toThrift());
- } catch (Exception e) {
- throw new SentryHdfsServiceException("Thrift Exception occurred !!", e);
- }
- }
-
- @Override
- public synchronized long getLastSeenHMSPathSeqNum()
- throws SentryHdfsServiceException {
- try {
- return client.check_hms_seq_num(-1);
- } catch (Exception e) {
- throw new SentryHdfsServiceException("Thrift Exception occurred !!", e);
- }
- }
-
- @Override
- public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
+ public SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
throws SentryHdfsServiceException {
SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>());
try {
@@ -132,12 +104,23 @@
}
@Override
- public synchronized void close() {
- transportFactory.close();
+ public void close() {
+ done();
}
@Override
- public void disconnect() {
- transportFactory.releaseTransport();
+ public void done() {
+ if (transport != null) {
+ transportPool.returnTransport(transport);
+ transport = null;
+ }
+ }
+
+ @Override
+ public void invalidate() {
+ if (transport != null) {
+ transportPool.invalidateTransport(transport);
+ transport = null;
+ }
}
}
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
index e350103..fb34b0b 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
@@ -18,28 +18,107 @@
package org.apache.sentry.hdfs;
import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.sentry.core.common.utils.SentryConstants.KERBEROS_MODE;
/**
- * Client factory to create normal client or proxy with HA invocation handler
+ * Client factory for creating HDFS service clients.
+ * This is a singleton which uses a single factory.
*/
+@ThreadSafe
public class SentryHDFSServiceClientFactory {
- private final static SentryHDFSClientTransportConfig transportConfig =
- new SentryHDFSClientTransportConfig();
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientFactory.class);
- private SentryHDFSServiceClientFactory() {
- // Make constructor private to avoid instantiation
+ private static final AtomicReference<SentryHDFSServiceClientFactory> clientFactory =
+ new AtomicReference<>();
+
+ private final SentryHDFSClientTransportConfig transportConfig =
+ new SentryHDFSClientTransportConfig();
+ private final Configuration conf;
+ private final SentryTransportPool transportPool;
+
+ /**
+ * Return a client instance
+ * @param conf
+ * @return
+ * @throws Exception
+ */
+ public static SentryHDFSServiceClient create(Configuration conf) throws Exception {
+ SentryHDFSServiceClientFactory factory = clientFactory.get();
+ if (factory != null) {
+ return factory.create();
+ }
+ factory = new SentryHDFSServiceClientFactory(conf);
+ boolean ok = clientFactory.compareAndSet(null, factory);
+ if (ok) {
+ return factory.create();
+ }
+ factory.close();
+ return clientFactory.get().create();
}
- public static SentryHDFSServiceClient create(Configuration conf)
- throws Exception {
+ private SentryHDFSServiceClientFactory(Configuration conf) {
+ Configuration clientConf = conf;
+
+ // When kerberos is enabled, UserGroupInformation should have been initialized with
+ // HADOOP_SECURITY_AUTHENTICATION property. There are instances where this is not done.
+ // Instead of depending on the callers to update this configuration and to be
+ // sure that UserGroupInformation is properly initialized, sentry client is explicitly
+ // doing it.
+ //
+ // This whole piece of code is a bit ugly but we want to avoid doing this in the transport
+ // code during connection establishment, so we are doing it upfront here instead.
+ boolean useKerberos = transportConfig.isKerberosEnabled(conf);
+
+ if (useKerberos) {
+ LOGGER.info("Using Kerberos authentication");
+ String authMode = conf.get(HADOOP_SECURITY_AUTHENTICATION, "");
+ if (authMode != KERBEROS_MODE) {
+ // Force auth mode to be Kerberos
+ clientConf = new Configuration(conf);
+ clientConf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS_MODE);
+ }
+ }
+
+ this.conf = clientConf;
+
+ boolean useUGI = transportConfig.useUserGroupInformation(conf);
+
+ if (useUGI) {
+ LOGGER.info("Using UserGroupInformation authentication");
+ UserGroupInformation.setConfiguration(this.conf);
+ }
+
+ transportPool = new SentryTransportPool(conf, transportConfig,
+ new SentryTransportFactory(conf, transportConfig));
+ }
+
+ private SentryHDFSServiceClient create() throws Exception {
return (SentryHDFSServiceClient) Proxy
.newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(),
SentryHDFSServiceClientDefaultImpl.class.getInterfaces(),
new RetryClientInvocationHandler(conf,
- new SentryHDFSServiceClientDefaultImpl(conf, transportConfig), transportConfig));
+ new SentryHDFSServiceClientDefaultImpl(conf, transportPool), transportConfig));
+ }
+
+ void close() {
+ try {
+ transportPool.close();
+ } catch (Exception e) {
+ LOGGER.error("failed to close transport pool", e);
+ }
}
}
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
index eccf83b..3ee3724 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
@@ -44,7 +44,10 @@
@After
public void after() {
if (hdfsClient != null) {
- hdfsClient.close();
+ try {
+ hdfsClient.close();
+ } catch (Exception ignored) {
+ }
}
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
index 75d2993..480991d 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
@@ -77,9 +77,8 @@
int retries = Math.max(retryCount + 1, 1); // if customer configs retryCount as Integer.MAX_VALUE, try only once
while (retries > 0) {
retries--;
- SentryPolicyServiceClient policyServiceClient = null;
- try {
- policyServiceClient = SentryServiceClientFactory.create(conf);
+ try (SentryPolicyServiceClient policyServiceClient =
+ SentryServiceClientFactory.create(conf)) {
return ImmutableSet.copyOf(policyServiceClient.listPrivilegesForProvider(groups, users,
roleSet, authorizableHierarchy));
} catch (Exception e) {
@@ -97,10 +96,6 @@
LOGGER.info("Sleeping is interrupted.", e1);
}
}
- } finally {
- if(policyServiceClient != null) {
- policyServiceClient.close();
- }
}
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
index 134012d..6c7d3ef 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
@@ -82,6 +82,7 @@
} catch (NoSuchMethodException | ClassNotFoundException | InstantiationException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException("Failed to create privilege converter of type " + privilegeConverter, e);
}
+ LOGGER.debug("Starting Updateable Cache");
UpdatableCache cache = new UpdatableCache(conf, getComponentType(), getServiceName(), sentryPrivilegeConverter);
try {
cache.startUpdateThread(true);
@@ -110,9 +111,7 @@
if (enableCaching) {
return super.getPrivileges(groups, roleSet, authorizableHierarchy);
} else {
- SentryGenericServiceClient client = null;
- try {
- client = getClient();
+ try (SentryGenericServiceClient client = getClient()){
return ImmutableSet.copyOf(client.listPrivilegesForProvider(componentType, serviceName,
roleSet, groups, Arrays.asList(authorizableHierarchy)));
} catch (SentryUserException e) {
@@ -121,10 +120,6 @@
} catch (Exception e) {
String msg = "Unable to obtain client:" + e.getMessage();
LOGGER.error(msg, e);
- } finally {
- if (client != null) {
- client.close();
- }
}
}
return ImmutableSet.of();
@@ -138,10 +133,8 @@
if (enableCaching) {
return super.getRoles(groups, roleSet);
} else {
- SentryGenericServiceClient client = null;
- try {
+ try (SentryGenericServiceClient client = getClient()){
Set<TSentryRole> tRoles = Sets.newHashSet();
- client = getClient();
//get the roles according to group
String requestor = UserGroupInformation.getCurrentUser().getShortUserName();
for (String group : groups) {
@@ -158,10 +151,6 @@
} catch (Exception e) {
String msg = "Unable to obtain client:" + e.getMessage();
LOGGER.error(msg, e);
- } finally {
- if (client != null) {
- client.close();
- }
}
return ImmutableSet.of();
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java
index 41708c3..d20710f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java
@@ -29,9 +29,15 @@
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
-class UpdatableCache implements TableCache {
+public final class UpdatableCache implements TableCache, AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(UpdatableCache.class);
+ // Timer for getting updates periodically
+ private final Timer timer = new Timer();
+ private boolean initialized = false;
+ // saved timer is used by tests to cancel previous timer
+ private static Timer savedTimer;
+
private final String componentType;
private final String serviceName;
private final long cacheTtlNs;
@@ -94,14 +100,13 @@
String requestor;
requestor = UserGroupInformation.getLoginUser().getShortUserName();
- SentryGenericServiceClient client = null;
- try {
- client = getClient();
- final Set<TSentryRole> tSentryRoles = client.listAllRoles(requestor, componentType);
+ try(SentryGenericServiceClient client = getClient()) {
+ Set<TSentryRole> tSentryRoles = client.listAllRoles(requestor, componentType);
for (TSentryRole tSentryRole : tSentryRoles) {
final String roleName = tSentryRole.getRoleName();
- final Set<TSentryPrivilege> tSentryPrivileges = client.listPrivilegesByRoleName(requestor, roleName, componentType, serviceName);
+ final Set<TSentryPrivilege> tSentryPrivileges =
+ client.listPrivilegesByRoleName(requestor, roleName, componentType, serviceName);
for (String group : tSentryRole.getGroups()) {
Set<String> currentPrivileges = tempCache.get(group, roleName);
if (currentPrivileges == null) {
@@ -113,12 +118,8 @@
}
}
}
- } finally {
- if (client != null) {
- client.close();
- }
+ return tempCache;
}
- return tempCache;
}
/**
@@ -136,7 +137,19 @@
reloadData();
}
- Timer timer = new Timer();
+ if (initialized) {
+ LOGGER.info("Already initialized");
+ return;
+ }
+
+ initialized = true;
+ // Save timer to be able to cancel it.
+ if (savedTimer != null) {
+ LOGGER.debug("Resetting saved timer");
+ savedTimer.cancel();
+ }
+ savedTimer = timer;
+
final long refreshIntervalMs = TimeUnit.NANOSECONDS.toMillis(cacheTtlNs);
timer.scheduleAtFixedRate(
new TimerTask() {
@@ -158,6 +171,7 @@
private void revokeAllPrivilegesIfRequired() {
if (++consecutiveUpdateFailuresCount > allowedUpdateFailuresCount) {
+ consecutiveUpdateFailuresCount = 0;
// Clear cache to revoke all privileges.
// Update table cache to point to an empty table to avoid thread-unsafe characteristics of HashBasedTable.
this.table = HashBasedTable.create();
@@ -175,4 +189,21 @@
final long currentTimeNs = System.nanoTime();
return lastRefreshedNs + cacheTtlNs < currentTimeNs;
}
+
+ /**
+ * Only called by tests to disable timer.
+ */
+ public static void disable() {
+ if (savedTimer != null) {
+ LOGGER.info("Disabling timer");
+ savedTimer.cancel();
+ }
+ }
+
+ @Override
+ public void close() {
+ timer.cancel();
+ savedTimer = null;
+ LOGGER.info("Closed Updatable Cache");
+ }
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
index 11cdee7..246d0b4 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
@@ -25,7 +25,7 @@
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-public interface SentryGenericServiceClient {
+public interface SentryGenericServiceClient extends AutoCloseable {
/**
* Create a sentry role
@@ -191,6 +191,4 @@
Map<String, TSentryPrivilegeMap> listPrivilegsbyAuthorizable(String component,
String serviceName, String requestorUserName, Set<String> authorizablesSet,
Set<String> groups, ActiveRoleSet roleSet) throws SentryUserException;
-
- void close();
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
index e23d13b..6301a6b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
@@ -17,52 +17,63 @@
*/
package org.apache.sentry.provider.db.generic.service.thrift;
-import java.io.IOException;
-import java.util.*;
-
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
-
-import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.core.common.transport.SentryConnection;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.apache.sentry.core.common.transport.TTransportWrapper;
import org.apache.sentry.core.model.db.AccessConstants;
-import org.apache.sentry.service.thrift.ServiceConstants;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService.Client;
+import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
import org.apache.sentry.service.thrift.Status;
import org.apache.sentry.service.thrift.sentry_common_serviceConstants;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
-import com.google.common.collect.Lists;
/**
- * Sentry Generic Service Client
+ * Sentry Generic Service Client.
* <p>
- * The public implementation of SentryGenericServiceClient.
- * TODO(kalyan) A Sentry Client in which all the operations are synchronized for thread safety
- * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state.
- * So it is important to close and re-open the transportFactory so that new socket is used.
+ * Thread safety. This class is not thread safe - it is up to the
+ * caller to ensure thread safety.
*/
+public class SentryGenericServiceClientDefaultImpl
+ implements SentryGenericServiceClient, SentryConnection {
-public class SentryGenericServiceClientDefaultImpl implements SentryGenericServiceClient, SentryServiceClient {
- private SentryGenericPolicyService.Client client;
- private SentryTransportFactory transportFactory;
- private TTransport transport;
- private Configuration conf;
- private static final Logger LOGGER = LoggerFactory
- .getLogger(SentryGenericServiceClientDefaultImpl.class);
+ private Client client;
+ private final SentryTransportPool transportPool;
+ private TTransportWrapper transport;
private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured ";
+ private final long maxMessageSize;
- public SentryGenericServiceClientDefaultImpl(Configuration conf, SentryPolicyClientTransportConfig transportConfig) throws IOException {
- transportFactory = new SentryTransportFactory(conf, transportConfig);
- this.conf = conf;
+ /**
+ * Initialize client with the given configuration, using specified transport pool
+ * implementation for obtaining transports.
+ * @param conf Sentry Configuration
+ * @param transportPool source of connected transports
+ */
+ SentryGenericServiceClientDefaultImpl(Configuration conf,
+ SentryTransportPool transportPool) {
+
+ //TODO(kalyan) need to find appropriate place to add it
+ // if (kerberos) {
+ // // since the client uses hadoop-auth, we need to set kerberos in
+ // // hadoop-auth if we plan to use kerberos
+ // conf.set(HADOOP_SECURITY_AUTHENTICATION, SentryConstants.KERBEROS_MoODE);
+ // }
+ maxMessageSize = conf.getLong(ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
+ ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ this.transportPool = transportPool;
}
/**
@@ -71,20 +82,18 @@
* @throws IOException
*/
@Override
- public synchronized void connect() throws IOException {
- if (transport != null && transport.isOpen()) {
+ public void connect() throws Exception {
+ if ((transport != null) && transport.isOpen()) {
return;
}
- transport = transportFactory.getTransport();
- TMultiplexedProtocol protocol = null;
- long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
- ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
- protocol = new TMultiplexedProtocol(
- new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true),
+ // Obtain connection to Sentry server
+ transport = transportPool.getTransport();
+ TMultiplexedProtocol protocol = new TMultiplexedProtocol(
+ new TBinaryProtocol(transport.getTTransport(), maxMessageSize,
+ maxMessageSize, true, true),
SentryGenericPolicyProcessor.SENTRY_GENERIC_SERVICE_NAME);
- client = new SentryGenericPolicyService.Client(protocol);
- LOGGER.debug("Successfully created client");
+ client = new Client(protocol);
}
/**
@@ -96,7 +105,7 @@
* @throws SentryUserException
*/
@Override
- public synchronized void createRole(String requestorUserName, String roleName, String component)
+ public void createRole(String requestorUserName, String roleName, String component)
throws SentryUserException {
TCreateSentryRoleRequest request = new TCreateSentryRoleRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
@@ -359,7 +368,7 @@
* @throws SentryUserException
*/
@Override
- public synchronized Set<TSentryRole> listRolesByGroupName(
+ public Set<TSentryRole> listRolesByGroupName(
String requestorUserName,
String groupName,
String component)
@@ -528,12 +537,23 @@
}
@Override
- public synchronized void close() {
- transportFactory.close();
+ public void close() {
+ done();
}
@Override
- public void disconnect() {
- transportFactory.releaseTransport();
+ public void done() {
+ if (transport != null) {
+ transportPool.returnTransport(transport);
+ transport = null;
+ }
+ }
+
+ @Override
+ public void invalidate() {
+ if (transport != null) {
+ transportPool.invalidateTransport(transport);
+ transport = null;
+ }
}
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
index 46ac4a3..1b47236 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
@@ -18,27 +18,131 @@
package org.apache.sentry.provider.db.generic.service.thrift;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.ThreadSafe;
import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.sentry.core.common.utils.SentryConstants.KERBEROS_MODE;
/**
- * SentryGenericServiceClientFactory is a public class for the components which using Generic Model to create sentry client.
+ * Produces client connection for Sentry clients using Generic model.
+ * Factory is [alost] a singleton. Tests can call {@link #factoryReset()} to destroy the
+ * existing factory and create a new one. This may be needed because tests modify
+ * configuration and start and stop servers.
*/
+@ThreadSafe
public final class SentryGenericServiceClientFactory {
- private static final SentryPolicyClientTransportConfig transportConfig =
- new SentryPolicyClientTransportConfig();
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryGenericServiceClientFactory.class);
- private SentryGenericServiceClientFactory() {
+ // Used to implement a singleton
+ private static final AtomicReference<SentryGenericServiceClientFactory> clientFactory =
+ new AtomicReference<>();
+
+ private final SentryPolicyClientTransportConfig transportConfig =
+ new SentryPolicyClientTransportConfig();
+ private final SentryTransportPool transportPool;
+ private final Configuration conf;
+
+ /**
+ * Obtain an Generic policy client instance.
+ * @param conf Configuration that should be used. Configuration is only used for the
+ * initial creation and ignored afterwords.
+ */
+ public static SentryGenericServiceClient create(Configuration conf) throws Exception {
+ SentryGenericServiceClientFactory factory = clientFactory.get();
+ if (factory != null) {
+ return factory.create();
+ }
+ factory = new SentryGenericServiceClientFactory(conf);
+ boolean ok = clientFactory.compareAndSet(null, factory);
+ if (ok) {
+ return factory.create();
+ }
+ factory.close();
+ return clientFactory.get().create();
}
- public static SentryGenericServiceClient create(Configuration conf) throws Exception {
+ /**
+ * Create a new factory instance and atach it to a connection pool instance.
+ * @param conf Configuration
+ */
+ private SentryGenericServiceClientFactory(Configuration conf) {
+ Configuration clientConf = conf;
+
+ // When kerberos is enabled, UserGroupInformation should have been initialized with
+ // HADOOP_SECURITY_AUTHENTICATION property. There are instances where this is not done.
+ // Instead of depending on the callers to update this configuration and to be
+ // sure that UserGroupInformation is properly initialized, sentry client is explicitly
+ // doing it.
+ //
+ // This whole piece of code is a bit ugly but we want to avoid doing this in the transport
+ // code during connection establishment, so we are doing it upfront here instead.
+ boolean useKerberos = transportConfig.isKerberosEnabled(conf);
+
+ if (useKerberos) {
+ LOGGER.info("Using Kerberos authentication");
+ String authMode = conf.get(HADOOP_SECURITY_AUTHENTICATION, "");
+ if (authMode != KERBEROS_MODE) {
+ // Force auth mode to be Kerberos
+ clientConf = new Configuration(conf);
+ clientConf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS_MODE);
+ }
+ }
+
+ this.conf = clientConf;
+
+ boolean useUGI = transportConfig.useUserGroupInformation(conf);
+
+ if (useUGI) {
+ LOGGER.info("Using UserGroupInformation authentication");
+ UserGroupInformation.setConfiguration(this.conf);
+ }
+
+ transportPool = new SentryTransportPool(conf, transportConfig,
+ new SentryTransportFactory(conf, transportConfig));
+ }
+
+ /**
+ * Create a new client connection to the server for Generic model clients
+ * @return client instance
+ * @throws Exception if something goes wrong
+ */
+ private SentryGenericServiceClient create() throws Exception {
return (SentryGenericServiceClient) Proxy
.newProxyInstance(SentryGenericServiceClientDefaultImpl.class.getClassLoader(),
SentryGenericServiceClientDefaultImpl.class.getInterfaces(),
new RetryClientInvocationHandler(conf,
- new SentryGenericServiceClientDefaultImpl(conf, transportConfig), transportConfig));
+ new SentryGenericServiceClientDefaultImpl(conf, transportPool), transportConfig));
}
+ // Should only be used by tests.
+ // Resets the factory and destroys any pooled connections
+ public static void factoryReset() {
+ LOGGER.debug("factory reset");
+ SentryGenericServiceClientFactory factory = clientFactory.getAndSet(null);
+ if (factory != null) {
+ try {
+ factory.transportPool.close();
+ } catch (Exception e) {
+ LOGGER.error("failed to close transport pool", e);
+ }
+ }
+ }
+
+ void close() {
+ try {
+ transportPool.close();
+ } catch (Exception e) {
+ LOGGER.error("failed to close transport pool", e);
+ }
+ }
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java
index 404adb8..b958b09 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java
@@ -62,12 +62,14 @@
String service = conf.get(SOLR_SERVICE_NAME, "service1");
// instantiate a solr client for sentry service. This sets the ugi, so must
// be done before getting the ugi below.
- SentryGenericServiceClient client = SentryGenericServiceClientFactory.create(conf);
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- String requestorName = ugi.getShortUserName();
+ try(SentryGenericServiceClient client =
+ SentryGenericServiceClientFactory.create(conf)) {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ String requestorName = ugi.getShortUserName();
- convertINIToSentryServiceCmds(component, service, requestorName, conf, client,
- getPolicyFile(), getValidate(), getImportPolicy(), getCheckCompat());
+ convertINIToSentryServiceCmds(component, service, requestorName, conf, client,
+ getPolicyFile(), getValidate(), getImportPolicy(), getCheckCompat());
+ }
}
private Configuration getSentryConf() {
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java
index d6d9014..f6e5d1b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java
@@ -48,39 +48,41 @@
Configuration conf = getSentryConf();
String service = conf.get(KAFKA_SERVICE_NAME, "kafka1");
- SentryGenericServiceClient client = SentryGenericServiceClientFactory.create(conf);
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- String requestorName = ugi.getShortUserName();
+ try(SentryGenericServiceClient client =
+ SentryGenericServiceClientFactory.create(conf)) {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ String requestorName = ugi.getShortUserName();
- if (isCreateRole) {
- command = new CreateRoleCmd(roleName, component);
- } else if (isDropRole) {
- command = new DropRoleCmd(roleName, component);
- } else if (isAddRoleGroup) {
- command = new AddRoleToGroupCmd(roleName, groupName, component);
- } else if (isDeleteRoleGroup) {
- command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
- } else if (isGrantPrivilegeRole) {
- command = new GrantPrivilegeToRoleCmd(roleName, component,
- privilegeStr, new KafkaTSentryPrivilegeConverter(component, service));
- } else if (isRevokePrivilegeRole) {
- command = new RevokePrivilegeFromRoleCmd(roleName, component,
- privilegeStr, new KafkaTSentryPrivilegeConverter(component, service));
- } else if (isListRole) {
- command = new ListRolesCmd(groupName, component);
- } else if (isListPrivilege) {
- command = new ListPrivilegesByRoleCmd(roleName, component,
- service, new KafkaTSentryPrivilegeConverter(component, service));
- }
+ if (isCreateRole) {
+ command = new CreateRoleCmd(roleName, component);
+ } else if (isDropRole) {
+ command = new DropRoleCmd(roleName, component);
+ } else if (isAddRoleGroup) {
+ command = new AddRoleToGroupCmd(roleName, groupName, component);
+ } else if (isDeleteRoleGroup) {
+ command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
+ } else if (isGrantPrivilegeRole) {
+ command = new GrantPrivilegeToRoleCmd(roleName, component,
+ privilegeStr, new KafkaTSentryPrivilegeConverter(component, service));
+ } else if (isRevokePrivilegeRole) {
+ command = new RevokePrivilegeFromRoleCmd(roleName, component,
+ privilegeStr, new KafkaTSentryPrivilegeConverter(component, service));
+ } else if (isListRole) {
+ command = new ListRolesCmd(groupName, component);
+ } else if (isListPrivilege) {
+ command = new ListPrivilegesByRoleCmd(roleName, component,
+ service, new KafkaTSentryPrivilegeConverter(component, service));
+ }
- // check the requestor name
- if (StringUtils.isEmpty(requestorName)) {
- // The exception message will be recorded in log file.
- throw new Exception("The requestor name is empty.");
- }
+ // check the requestor name
+ if (StringUtils.isEmpty(requestorName)) {
+ // The exception message will be recorded in log file.
+ throw new Exception("The requestor name is empty.");
+ }
- if (command != null) {
- command.execute(client, requestorName);
+ if (command != null) {
+ command.execute(client, requestorName);
+ }
}
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java
index 695c008..5385f7d 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java
@@ -47,39 +47,41 @@
Configuration conf = getSentryConf();
String service = conf.get(SOLR_SERVICE_NAME, "service1");
- SentryGenericServiceClient client = SentryGenericServiceClientFactory.create(conf);
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- String requestorName = ugi.getShortUserName();
+ try(SentryGenericServiceClient client =
+ SentryGenericServiceClientFactory.create(conf)) {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ String requestorName = ugi.getShortUserName();
- if (isCreateRole) {
- command = new CreateRoleCmd(roleName, component);
- } else if (isDropRole) {
- command = new DropRoleCmd(roleName, component);
- } else if (isAddRoleGroup) {
- command = new AddRoleToGroupCmd(roleName, groupName, component);
- } else if (isDeleteRoleGroup) {
- command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
- } else if (isGrantPrivilegeRole) {
- command = new GrantPrivilegeToRoleCmd(roleName, component,
- privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
- } else if (isRevokePrivilegeRole) {
- command = new RevokePrivilegeFromRoleCmd(roleName, component,
- privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
- } else if (isListRole) {
- command = new ListRolesCmd(groupName, component);
- } else if (isListPrivilege) {
- command = new ListPrivilegesByRoleCmd(roleName, component,
- service, new SolrTSentryPrivilegeConverter(component, service));
- }
+ if (isCreateRole) {
+ command = new CreateRoleCmd(roleName, component);
+ } else if (isDropRole) {
+ command = new DropRoleCmd(roleName, component);
+ } else if (isAddRoleGroup) {
+ command = new AddRoleToGroupCmd(roleName, groupName, component);
+ } else if (isDeleteRoleGroup) {
+ command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
+ } else if (isGrantPrivilegeRole) {
+ command = new GrantPrivilegeToRoleCmd(roleName, component,
+ privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
+ } else if (isRevokePrivilegeRole) {
+ command = new RevokePrivilegeFromRoleCmd(roleName, component,
+ privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
+ } else if (isListRole) {
+ command = new ListRolesCmd(groupName, component);
+ } else if (isListPrivilege) {
+ command = new ListPrivilegesByRoleCmd(roleName, component,
+ service, new SolrTSentryPrivilegeConverter(component, service));
+ }
- // check the requestor name
- if (StringUtils.isEmpty(requestorName)) {
- // The exception message will be recorded in log file.
- throw new Exception("The requestor name is empty.");
- }
+ // check the requestor name
+ if (StringUtils.isEmpty(requestorName)) {
+ // The exception message will be recorded in log file.
+ throw new Exception("The requestor name is empty.");
+ }
- if (command != null) {
- command.execute(client, requestorName);
+ if (command != null) {
+ command.execute(client, requestorName);
+ }
}
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
index c2b03e5..fb8036f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
@@ -26,7 +26,7 @@
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-public interface SentryPolicyServiceClient {
+public interface SentryPolicyServiceClient extends AutoCloseable {
void createRole(String requestorUserName, String roleName) throws SentryUserException;
@@ -208,8 +208,6 @@
*/
String getConfigValue(String propertyName, String defaultValue) throws SentryUserException;
- void close();
-
// Import the sentry mapping data with map structure
void importPolicy(Map<String, Map<String, Set<String>>> policyFileMappingData,
String requestorUserName, boolean isOverwriteRole) throws SentryUserException;
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
index d1a4d99..b5b8f82 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
@@ -18,78 +18,62 @@
package org.apache.sentry.provider.db.service.thrift;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Collections;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.sentry.core.common.exception.SentryUserException;
-import org.apache.sentry.core.common.ActiveRoleSet;
-import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryTransportFactory;
-import org.apache.sentry.core.model.db.AccessConstants;
-import org.apache.sentry.core.model.db.DBModelAuthorizable;
-import org.apache.sentry.core.common.utils.PolicyFileConstants;
-import org.apache.sentry.service.thrift.SentryServiceUtil;
-import org.apache.sentry.service.thrift.ServiceConstants;
-import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
-import org.apache.sentry.service.thrift.ServiceConstants.ThriftConstants;
-import org.apache.sentry.service.thrift.Status;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TMultiplexedProtocol;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.core.common.transport.SentryConnection;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.apache.sentry.core.common.transport.TTransportWrapper;
+import org.apache.sentry.core.common.utils.PolicyFileConstants;
+import org.apache.sentry.core.model.db.AccessConstants;
+import org.apache.sentry.core.model.db.DBModelAuthorizable;
+import org.apache.sentry.provider.db.service.thrift.SentryPolicyService.Client;
+import org.apache.sentry.service.thrift.SentryServiceUtil;
+import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
+import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
+import org.apache.sentry.service.thrift.ServiceConstants.ThriftConstants;
+import org.apache.sentry.service.thrift.Status;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TMultiplexedProtocol;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
- * Sentry Policy Service Client
+ * Client implementation for Policy (HMS) clients.
* <p>
- * The public implementation of SentryPolicyServiceClient.
- * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state
- * So it is important to close and re-open the transportFactory so that new socket is used.
- * When an class is instantiated, there will be transportFactory created connecting with first available
- * server this is configured.
+ * The class is not thread-safe - it is up to the callers to ensure thread safety
*/
-public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyServiceClient, SentryServiceClient {
+public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyServiceClient, SentryConnection {
- private SentryPolicyService.Client client;
- private SentryTransportFactory transportFactory;
- private TTransport transport;
- private Configuration conf;
+ private Client client;
+ private final SentryTransportPool transportPool;
+ private TTransportWrapper transport;
+ private final long maxMessageSize;
- private static final Logger LOGGER = LoggerFactory
- .getLogger(SentryPolicyServiceClient.class);
private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred ";
/**
* Initialize the sentry configurations.
*/
- public SentryPolicyServiceClientDefaultImpl(Configuration conf, SentryPolicyClientTransportConfig transportConfig)
+ public SentryPolicyServiceClientDefaultImpl(Configuration conf,
+ SentryTransportPool transportPool)
throws IOException {
- transportFactory = new SentryTransportFactory(conf, transportConfig);
- this.conf = conf;
- }
-
- public SentryPolicyServiceClientDefaultImpl(String addr, int port,
- Configuration conf) throws IOException {
- transportFactory = new SentryTransportFactory(addr, port, conf,
- new SentryPolicyClientTransportConfig());
- this.conf = conf;
- connect();
+ maxMessageSize = conf.getLong(ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
+ ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ this.transportPool = transportPool;
}
/**
@@ -98,24 +82,21 @@
* @throws IOException
*/
@Override
- public synchronized void connect() throws IOException {
- if (transport != null && transport.isOpen()) {
+ public void connect() throws Exception {
+ if ((transport != null) && transport.isOpen()) {
return;
}
- transport = transportFactory.getTransport();
- long maxMessageSize = conf.getLong(
- ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
- ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ transport = transportPool.getTransport();
TMultiplexedProtocol protocol = new TMultiplexedProtocol(
- new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true),
- SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME);
- client = new SentryPolicyService.Client(protocol);
- LOGGER.debug("Successfully created client");
+ new TBinaryProtocol(transport.getTTransport(), maxMessageSize, maxMessageSize,
+ true, true),
+ SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME);
+ client = new Client(protocol);
}
@Override
- public synchronized void createRole(String requestorUserName, String roleName)
+ public void createRole(String requestorUserName, String roleName)
throws SentryUserException {
TCreateSentryRoleRequest request = new TCreateSentryRoleRequest();
request.setProtocol_version(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT);
@@ -130,20 +111,20 @@
}
@Override
- public synchronized void dropRole(String requestorUserName,
+ public void dropRole(String requestorUserName,
String roleName)
throws SentryUserException {
dropRole(requestorUserName, roleName, false);
}
@Override
- public synchronized void dropRoleIfExists(String requestorUserName,
+ public void dropRoleIfExists(String requestorUserName,
String roleName)
throws SentryUserException {
dropRole(requestorUserName, roleName, true);
}
- private synchronized void dropRole(String requestorUserName,
+ private void dropRole(String requestorUserName,
String roleName, boolean ifExists)
throws SentryUserException {
TDropSentryRoleRequest request = new TDropSentryRoleRequest();
@@ -171,7 +152,7 @@
* @throws SentryUserException
*/
@Override
- public synchronized Set<TSentryRole> listRolesByGroupName(
+ public Set<TSentryRole> listRolesByGroupName(
String requestorUserName,
String groupName)
throws SentryUserException {
@@ -219,7 +200,7 @@
}
@Override
- public synchronized Set<TSentryPrivilege> listAllPrivilegesByRoleName(String requestorUserName,
+ public Set<TSentryPrivilege> listAllPrivilegesByRoleName(String requestorUserName,
String roleName)
throws SentryUserException {
return listPrivilegesByRoleName(requestorUserName, roleName, null);
@@ -235,7 +216,7 @@
* @throws SentryUserException
*/
@Override
- public synchronized Set<TSentryPrivilege> listPrivilegesByRoleName(String requestorUserName,
+ public Set<TSentryPrivilege> listPrivilegesByRoleName(String requestorUserName,
String roleName, List<? extends Authorizable> authorizable)
throws SentryUserException {
TListSentryPrivilegesRequest request = new TListSentryPrivilegesRequest();
@@ -257,13 +238,13 @@
}
@Override
- public synchronized Set<TSentryRole> listRoles(String requestorUserName)
+ public Set<TSentryRole> listRoles(String requestorUserName)
throws SentryUserException {
return listRolesByGroupName(requestorUserName, null);
}
@Override
- public synchronized Set<TSentryRole> listUserRoles(String requestorUserName)
+ public Set<TSentryRole> listUserRoles(String requestorUserName)
throws SentryUserException {
Set<TSentryRole> tSentryRoles = Sets.newHashSet();
tSentryRoles.addAll(listRolesByGroupName(requestorUserName, AccessConstants.ALL));
@@ -272,7 +253,7 @@
}
@Override
- public synchronized TSentryPrivilege grantURIPrivilege(String requestorUserName,
+ public TSentryPrivilege grantURIPrivilege(String requestorUserName,
String roleName, String server, String uri)
throws SentryUserException {
return grantPrivilege(requestorUserName, roleName,
@@ -280,7 +261,7 @@
}
@Override
- public synchronized TSentryPrivilege grantURIPrivilege(String requestorUserName,
+ public TSentryPrivilege grantURIPrivilege(String requestorUserName,
String roleName, String server, String uri, Boolean grantOption)
throws SentryUserException {
return grantPrivilege(requestorUserName, roleName,
@@ -288,7 +269,7 @@
}
@Override
- public synchronized void grantServerPrivilege(String requestorUserName,
+ public void grantServerPrivilege(String requestorUserName,
String roleName, String server, String action)
throws SentryUserException {
@@ -307,14 +288,14 @@
* Should use grantServerPrivilege(String requestorUserName,
* String roleName, String server, String action, Boolean grantOption)
*/
- public synchronized TSentryPrivilege grantServerPrivilege(String requestorUserName,
+ public TSentryPrivilege grantServerPrivilege(String requestorUserName,
String roleName, String server, Boolean grantOption) throws SentryUserException {
return grantServerPrivilege(requestorUserName, roleName, server,
AccessConstants.ALL, grantOption);
}
@Override
- public synchronized TSentryPrivilege grantServerPrivilege(String requestorUserName,
+ public TSentryPrivilege grantServerPrivilege(String requestorUserName,
String roleName, String server, String action, Boolean grantOption)
throws SentryUserException {
@@ -329,7 +310,7 @@
}
@Override
- public synchronized TSentryPrivilege grantDatabasePrivilege(String requestorUserName,
+ public TSentryPrivilege grantDatabasePrivilege(String requestorUserName,
String roleName, String server, String db, String action)
throws SentryUserException {
return grantPrivilege(requestorUserName, roleName,
@@ -337,7 +318,7 @@
}
@Override
- public synchronized TSentryPrivilege grantDatabasePrivilege(String requestorUserName,
+ public TSentryPrivilege grantDatabasePrivilege(String requestorUserName,
String roleName, String server, String db, String action, Boolean grantOption)
throws SentryUserException {
return grantPrivilege(requestorUserName, roleName,
@@ -345,7 +326,7 @@
}
@Override
- public synchronized TSentryPrivilege grantTablePrivilege(String requestorUserName,
+ public TSentryPrivilege grantTablePrivilege(String requestorUserName,
String roleName, String server, String db, String table, String action)
throws SentryUserException {
return grantPrivilege(requestorUserName, roleName, PrivilegeScope.TABLE, server,
@@ -354,7 +335,7 @@
}
@Override
- public synchronized TSentryPrivilege grantTablePrivilege(String requestorUserName,
+ public TSentryPrivilege grantTablePrivilege(String requestorUserName,
String roleName, String server, String db, String table, String action, Boolean grantOption)
throws SentryUserException {
return grantPrivilege(requestorUserName, roleName, PrivilegeScope.TABLE, server,
@@ -362,7 +343,7 @@
}
@Override
- public synchronized TSentryPrivilege grantColumnPrivilege(String requestorUserName,
+ public TSentryPrivilege grantColumnPrivilege(String requestorUserName,
String roleName, String server, String db, String table, String columnName, String action)
throws SentryUserException {
return grantPrivilege(requestorUserName, roleName, PrivilegeScope.COLUMN, server,
@@ -371,7 +352,7 @@
}
@Override
- public synchronized TSentryPrivilege grantColumnPrivilege(String requestorUserName,
+ public TSentryPrivilege grantColumnPrivilege(String requestorUserName,
String roleName, String server, String db, String table, String columnName, String action, Boolean grantOption)
throws SentryUserException {
return grantPrivilege(requestorUserName, roleName, PrivilegeScope.COLUMN, server,
@@ -379,7 +360,7 @@
}
@Override
- public synchronized Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName,
+ public Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName,
String roleName, String server, String db, String table, List<String> columnNames, String action)
throws SentryUserException {
return grantPrivileges(requestorUserName, roleName, PrivilegeScope.COLUMN, server,
@@ -388,7 +369,7 @@
}
@Override
- public synchronized Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName,
+ public Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName,
String roleName, String server, String db, String table, List<String> columnNames, String action, Boolean grantOption)
throws SentryUserException {
return grantPrivileges(requestorUserName, roleName, PrivilegeScope.COLUMN,
@@ -397,14 +378,14 @@
}
@Override
- public synchronized Set<TSentryPrivilege> grantPrivileges(
+ public Set<TSentryPrivilege> grantPrivileges(
String requestorUserName, String roleName,
Set<TSentryPrivilege> privileges) throws SentryUserException {
return grantPrivilegesCore(requestorUserName, roleName, privileges);
}
@Override
- public synchronized TSentryPrivilege grantPrivilege(String requestorUserName, String roleName,
+ public TSentryPrivilege grantPrivilege(String requestorUserName, String roleName,
TSentryPrivilege privilege) throws SentryUserException {
return grantPrivilegeCore(requestorUserName, roleName, privilege);
}
@@ -500,12 +481,12 @@
}
@Override
- public synchronized void revokePrivileges(String requestorUserName, String roleName, Set<TSentryPrivilege> privileges) throws SentryUserException {
+ public void revokePrivileges(String requestorUserName, String roleName, Set<TSentryPrivilege> privileges) throws SentryUserException {
this.revokePrivilegesCore(requestorUserName, roleName, privileges);
}
@Override
- public synchronized void revokePrivilege(String requestorUserName, String roleName, TSentryPrivilege privilege) throws SentryUserException {
+ public void revokePrivilege(String requestorUserName, String roleName, TSentryPrivilege privilege) throws SentryUserException {
this.revokePrivilegeCore(requestorUserName, roleName, privilege);
}
@@ -530,7 +511,7 @@
}
@Override
- public synchronized void revokeURIPrivilege(String requestorUserName,
+ public void revokeURIPrivilege(String requestorUserName,
String roleName, String server, String uri)
throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
@@ -538,7 +519,7 @@
}
@Override
- public synchronized void revokeURIPrivilege(String requestorUserName,
+ public void revokeURIPrivilege(String requestorUserName,
String roleName, String server, String uri, Boolean grantOption)
throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
@@ -546,7 +527,7 @@
}
@Override
- public synchronized void revokeServerPrivilege(String requestorUserName,
+ public void revokeServerPrivilege(String requestorUserName,
String roleName, String server, String action)
throws SentryUserException {
@@ -560,7 +541,7 @@
PrivilegeScope.SERVER, server, null, null, null, null, action);
}
- public synchronized void revokeServerPrivilege(String requestorUserName,
+ public void revokeServerPrivilege(String requestorUserName,
String roleName, String server, String action, Boolean grantOption)
throws SentryUserException {
@@ -580,7 +561,7 @@
* String roleName, String server, String action, Boolean grantOption)
*/
@Override
- public synchronized void revokeServerPrivilege(String requestorUserName,
+ public void revokeServerPrivilege(String requestorUserName,
String roleName, String server, boolean grantOption)
throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
@@ -588,7 +569,7 @@
}
@Override
- public synchronized void revokeDatabasePrivilege(String requestorUserName,
+ public void revokeDatabasePrivilege(String requestorUserName,
String roleName, String server, String db, String action)
throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
@@ -596,7 +577,7 @@
}
@Override
- public synchronized void revokeDatabasePrivilege(String requestorUserName,
+ public void revokeDatabasePrivilege(String requestorUserName,
String roleName, String server, String db, String action, Boolean grantOption)
throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
@@ -604,7 +585,7 @@
}
@Override
- public synchronized void revokeTablePrivilege(String requestorUserName,
+ public void revokeTablePrivilege(String requestorUserName,
String roleName, String server, String db, String table, String action)
throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
@@ -613,7 +594,7 @@
}
@Override
- public synchronized void revokeTablePrivilege(String requestorUserName,
+ public void revokeTablePrivilege(String requestorUserName,
String roleName, String server, String db, String table, String action, Boolean grantOption)
throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
@@ -622,7 +603,7 @@
}
@Override
- public synchronized void revokeColumnPrivilege(String requestorUserName, String roleName,
+ public void revokeColumnPrivilege(String requestorUserName, String roleName,
String server, String db, String table, String columnName, String action)
throws SentryUserException {
ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
@@ -633,7 +614,7 @@
}
@Override
- public synchronized void revokeColumnPrivilege(String requestorUserName, String roleName,
+ public void revokeColumnPrivilege(String requestorUserName, String roleName,
String server, String db, String table, String columnName, String action, Boolean grantOption)
throws SentryUserException {
ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
@@ -644,7 +625,7 @@
}
@Override
- public synchronized void revokeColumnsPrivilege(String requestorUserName, String roleName,
+ public void revokeColumnsPrivilege(String requestorUserName, String roleName,
String server, String db, String table, List<String> columns, String action)
throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
@@ -653,7 +634,7 @@
}
@Override
- public synchronized void revokeColumnsPrivilege(String requestorUserName, String roleName,
+ public void revokeColumnsPrivilege(String requestorUserName, String roleName,
String server, String db, String table, List<String> columns, String action, Boolean grantOption)
throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
@@ -741,7 +722,7 @@
}
@Override
- public synchronized Set<String> listPrivilegesForProvider
+ public Set<String> listPrivilegesForProvider
(Set<String> groups, Set<String> users,
ActiveRoleSet roleSet, Authorizable... authorizable) throws SentryUserException {
TSentryActiveRoleSet thriftRoleSet = new TSentryActiveRoleSet(roleSet.isAll(), roleSet.getRoles());
@@ -766,21 +747,21 @@
}
@Override
- public synchronized void grantRoleToGroup(String requestorUserName,
+ public void grantRoleToGroup(String requestorUserName,
String groupName, String roleName)
throws SentryUserException {
grantRoleToGroups(requestorUserName, roleName, Sets.newHashSet(groupName));
}
@Override
- public synchronized void revokeRoleFromGroup(String requestorUserName,
+ public void revokeRoleFromGroup(String requestorUserName,
String groupName, String roleName)
throws SentryUserException {
revokeRoleFromGroups(requestorUserName, roleName, Sets.newHashSet(groupName));
}
@Override
- public synchronized void grantRoleToGroups(String requestorUserName,
+ public void grantRoleToGroups(String requestorUserName,
String roleName, Set<String> groups)
throws SentryUserException {
TAlterSentryRoleAddGroupsRequest request = new TAlterSentryRoleAddGroupsRequest(
@@ -795,7 +776,7 @@
}
@Override
- public synchronized void revokeRoleFromGroups(String requestorUserName,
+ public void revokeRoleFromGroups(String requestorUserName,
String roleName, Set<String> groups)
throws SentryUserException {
TAlterSentryRoleDeleteGroupsRequest request = new TAlterSentryRoleDeleteGroupsRequest(
@@ -810,19 +791,19 @@
}
@Override
- public synchronized void grantRoleToUser(String requestorUserName, String userName,
+ public void grantRoleToUser(String requestorUserName, String userName,
String roleName) throws SentryUserException {
grantRoleToUsers(requestorUserName, roleName, Sets.newHashSet(userName));
}
@Override
- public synchronized void revokeRoleFromUser(String requestorUserName, String userName,
+ public void revokeRoleFromUser(String requestorUserName, String userName,
String roleName) throws SentryUserException {
revokeRoleFromUsers(requestorUserName, roleName, Sets.newHashSet(userName));
}
@Override
- public synchronized void grantRoleToUsers(String requestorUserName, String roleName,
+ public void grantRoleToUsers(String requestorUserName, String roleName,
Set<String> users) throws SentryUserException {
TAlterSentryRoleAddUsersRequest request = new TAlterSentryRoleAddUsersRequest(
ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, roleName, users);
@@ -835,7 +816,7 @@
}
@Override
- public synchronized void revokeRoleFromUsers(String requestorUserName, String roleName,
+ public void revokeRoleFromUsers(String requestorUserName, String roleName,
Set<String> users) throws SentryUserException {
TAlterSentryRoleDeleteUsersRequest request = new TAlterSentryRoleDeleteUsersRequest(
ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, roleName, users);
@@ -858,7 +839,7 @@
}
@Override
- public synchronized void dropPrivileges(String requestorUserName,
+ public void dropPrivileges(String requestorUserName,
List<? extends Authorizable> authorizableObjects)
throws SentryUserException {
TSentryAuthorizable tSentryAuthorizable = setupSentryAuthorizable(authorizableObjects);
@@ -875,7 +856,7 @@
}
@Override
- public synchronized void renamePrivileges(String requestorUserName,
+ public void renamePrivileges(String requestorUserName,
List<? extends Authorizable> oldAuthorizables,
List<? extends Authorizable> newAuthorizables) throws SentryUserException {
TSentryAuthorizable tOldSentryAuthorizable = setupSentryAuthorizable(oldAuthorizables);
@@ -894,7 +875,7 @@
}
@Override
- public synchronized Map<TSentryAuthorizable, TSentryPrivilegeMap> listPrivilegsbyAuthorizable
+ public Map<TSentryAuthorizable, TSentryPrivilegeMap> listPrivilegsbyAuthorizable
(
String requestorUserName,
Set<List<? extends Authorizable>> authorizables, Set<String> groups,
@@ -937,7 +918,7 @@
*/
@Override
- public synchronized String getConfigValue(String propertyName, String defaultValue)
+ public String getConfigValue(String propertyName, String defaultValue)
throws SentryUserException {
TSentryConfigValueRequest request = new TSentryConfigValueRequest(
ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, propertyName);
@@ -977,7 +958,7 @@
* @param requestorUserName The name of the request user
*/
@Override
- public synchronized void importPolicy
+ public void importPolicy
(Map<String, Map<String, Set<String>>> policyFileMappingData,
String requestorUserName, boolean isOverwriteRole)
throws SentryUserException {
@@ -1022,7 +1003,7 @@
// export the sentry mapping data with map structure
@Override
- public synchronized Map<String, Map<String, Set<String>>> exportPolicy(String
+ public Map<String, Map<String, Set<String>>> exportPolicy(String
requestorUserName,
String objectPath) throws SentryUserException {
TSentryExportMappingDataRequest request = new TSentryExportMappingDataRequest(
@@ -1065,12 +1046,23 @@
}
@Override
- public synchronized void close() {
- transportFactory.close();
+ public void close() {
+ done();
}
@Override
- public void disconnect() {
- transportFactory.releaseTransport();
+ public void done() {
+ if (transport != null) {
+ transportPool.returnTransport(transport);
+ transport = null;
+ }
+ }
+
+ @Override
+ public void invalidate() {
+ if (transport != null) {
+ transportPool.invalidateTransport(transport);
+ transport = null;
+ }
}
}
\ No newline at end of file
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
index 1d09846..09f17ed 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
@@ -40,36 +40,39 @@
public void run() throws Exception {
Command command = null;
- SentryPolicyServiceClient client = SentryServiceClientFactory.create(getSentryConf());
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- String requestorName = ugi.getShortUserName();
- if (isCreateRole) {
- command = new CreateRoleCmd(roleName);
- } else if (isDropRole) {
- command = new DropRoleCmd(roleName);
- } else if (isAddRoleGroup) {
- command = new GrantRoleToGroupsCmd(roleName, groupName);
- } else if (isDeleteRoleGroup) {
- command = new RevokeRoleFromGroupsCmd(roleName, groupName);
- } else if (isGrantPrivilegeRole) {
- command = new GrantPrivilegeToRoleCmd(roleName, privilegeStr);
- } else if (isRevokePrivilegeRole) {
- command = new RevokePrivilegeFromRoleCmd(roleName, privilegeStr);
- } else if (isListRole) {
- command = new ListRolesCmd(groupName);
- } else if (isListPrivilege) {
- command = new ListPrivilegesCmd(roleName);
- }
+ try(SentryPolicyServiceClient client =
+ SentryServiceClientFactory.create(getSentryConf())) {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ String requestorName = ugi.getShortUserName();
- // check the requestor name
- if (StringUtils.isEmpty(requestorName)) {
- // The exception message will be recoreded in log file.
- throw new Exception("The requestor name is empty.");
- }
+ if (isCreateRole) {
+ command = new CreateRoleCmd(roleName);
+ } else if (isDropRole) {
+ command = new DropRoleCmd(roleName);
+ } else if (isAddRoleGroup) {
+ command = new GrantRoleToGroupsCmd(roleName, groupName);
+ } else if (isDeleteRoleGroup) {
+ command = new RevokeRoleFromGroupsCmd(roleName, groupName);
+ } else if (isGrantPrivilegeRole) {
+ command = new GrantPrivilegeToRoleCmd(roleName, privilegeStr);
+ } else if (isRevokePrivilegeRole) {
+ command = new RevokePrivilegeFromRoleCmd(roleName, privilegeStr);
+ } else if (isListRole) {
+ command = new ListRolesCmd(groupName);
+ } else if (isListPrivilege) {
+ command = new ListPrivilegesCmd(roleName);
+ }
- if (command != null) {
- command.execute(client, requestorName);
+ // check the requestor name
+ if (StringUtils.isEmpty(requestorName)) {
+ // The exception message will be recoreded in log file.
+ throw new Exception("The requestor name is empty.");
+ }
+
+ if (command != null) {
+ command.execute(client, requestorName);
+ }
}
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
deleted file mode 100644
index acf9b05..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.service.thrift;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-import com.google.common.net.HostAndPort;
-import org.apache.commons.pool2.PooledObjectFactory;
-import org.apache.commons.pool2.impl.AbandonedConfig;
-import org.apache.commons.pool2.impl.GenericObjectPool;
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.core.common.exception.SentryUserException;
-import org.apache.sentry.core.common.transport.SentryClientInvocationHandler;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
-import org.apache.sentry.core.common.utils.ThriftUtil;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The PoolClientInvocationHandler is a proxy class for handling thrift
- * call. For every thrift call, get the instance of
- * SentryPolicyServiceBaseClient from the commons-pool, and return the instance
- * to the commons-pool after complete the call. For any exception with the call,
- * discard the instance and create a new one added to the commons-pool. Then,
- * get the instance and do the call again. For the thread safe, the commons-pool
- * will manage the connection pool, and every thread can get the connection by
- * borrowObject() and return the connection to the pool by returnObject().
- *
- * TODO: Current pool model does not manage the opening connections very well,
- * e.g. opening connections with failed servers should be closed promptly.
- */
-
-public class PoolClientInvocationHandler extends SentryClientInvocationHandler {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(PoolClientInvocationHandler.class);
-
- private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occurred ";
-
- private final Configuration conf;
-
- /**
- * The configuration to use for our object pools.
- * Null if we are not using object pools.
- */
- private final GenericObjectPoolConfig poolConfig;
-
- /**
- * The total number of connection retries to attempt per endpoint.
- */
- private final int connectionRetryTotal;
-
- /**
- * The configured sentry servers.
- */
- private final Endpoint[] endpoints;
-
- /**
- * The endpoint which we are currently using. This can be read without any locks.
- * It must be written while holding the endpoints lock.
- */
- private volatile int freshestEndpointIdx = 0;
-
- private class Endpoint {
- /**
- * The server address or hostname.
- */
- private final String addr;
-
- /**
- * The server port.
- */
- private final int port;
-
- /**
- * The server's poolFactory used to create new clients.
- */
- private final PooledObjectFactory<SentryPolicyServiceClient> poolFactory;
-
- /**
- * The server's pool of cached clients.
- */
- private final GenericObjectPool<SentryPolicyServiceClient> pool;
-
- Endpoint(String addr, int port) {
- this.addr = addr;
- this.port = port;
- this.poolFactory = new SentryServiceClientPoolFactory(addr, port, conf);
- this.pool = new GenericObjectPool<SentryPolicyServiceClient>(
- this.poolFactory, poolConfig, new AbandonedConfig());
- }
-
- GenericObjectPool<SentryPolicyServiceClient> getPool() {
- return pool;
- }
-
- String getEndPointStr() {
- return new String("endpoint at [address " + addr + ", port " + port + "]");
- }
- }
-
- public PoolClientInvocationHandler(Configuration conf) throws Exception {
- this.conf = conf;
-
- this.poolConfig = new GenericObjectPoolConfig();
- // config the pool size for commons-pool
- this.poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL,
- ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT));
- this.poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE,
- ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT));
- this.poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE,
- ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT));
-
- // get the retry number for reconnecting service
- this.connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL,
- ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT);
-
- String hostsAndPortsStr = conf.get(ClientConfig.SERVER_RPC_ADDRESS);
- if (hostsAndPortsStr == null) {
- throw new RuntimeException("Config key " +
- ClientConfig.SERVER_RPC_ADDRESS + " is required");
- }
- int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT,
- ClientConfig.SERVER_RPC_PORT_DEFAULT);
- String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
- HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort);
- this.endpoints = new Endpoint[hostsAndPorts.length];
- for (int i = 0; i < this.endpoints.length; i++) {
- this.endpoints[i] = new Endpoint(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort());
- LOGGER.info("Initiate sentry sever endpoint: hostname " +
- hostsAndPorts[i].getHostText() + ", port " + hostsAndPorts[i].getPort());
- }
- }
-
- @Override
- public Object invokeImpl(Object proxy, Method method, Object[] args)
- throws Exception {
- int retryCount = 0;
- /**
- * The maximum number of retries that we will do. Each endpoint gets its
- * own set of retries.
- */
- int retryLimit = connectionRetryTotal * endpoints.length;
-
- /**
- * The index of the endpoint to use.
- */
- int endpointIdx = freshestEndpointIdx;
-
- /**
- * A list of exceptions from each endpoint. This starts as null to avoid
- * memory allocation in the common case where there is no error.
- */
- Exception exc[] = null;
-
- Object ret = null;
-
- while (retryCount < retryLimit) {
- GenericObjectPool<SentryPolicyServiceClient> pool =
- endpoints[endpointIdx].getPool();
- try {
- if ((exc != null) &&
- (exc[endpointIdx] instanceof TTransportException)) {
- // If there was a TTransportException last time we tried to contact
- // this endpoint, attempt to create a new connection before we try
- // again.
- synchronized (endpoints) {
- // If there has room, create new instance and add it to the
- // commons-pool. This instance will be returned first from the
- // commons-pool, because the configuration is LIFO.
- if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) {
- pool.addObject();
- }
- }
- }
- // Try to make the RPC.
- ret = invokeFromPool(method, args, pool);
- break;
- } catch (TTransportException e) {
- if (exc == null) {
- exc = new Exception[endpoints.length];
- }
- exc[endpointIdx] = e;
- }
-
- Exception lastExc = exc[endpointIdx];
- synchronized (endpoints) {
- int curFreshestEndpointIdx = freshestEndpointIdx;
- if (curFreshestEndpointIdx == endpointIdx) {
- curFreshestEndpointIdx =
- (curFreshestEndpointIdx + 1) % endpoints.length;
- freshestEndpointIdx = curFreshestEndpointIdx;
- }
- endpointIdx = curFreshestEndpointIdx;
- }
- // Increase the retry num, and throw the exception if can't retry again.
- retryCount++;
- if (retryCount == connectionRetryTotal) {
- for (int i = 0; i < exc.length; i++) {
- // Since freshestEndpointIdx is shared by multiple threads, it is possible that
- // the ith endpoint has been tried in another thread and skipped in the current
- // thread.
- if (exc[i] != null) {
- LOGGER.error("Sentry server " + endpoints[i].getEndPointStr()
- + " is in unreachable.");
- }
- }
- throw new SentryUserException("Sentry servers are unreachable. " +
- "Diagnostics is needed for unreachable servers.", lastExc);
- }
- }
- return ret;
- }
-
- private Object invokeFromPool(Method method, Object[] args,
- GenericObjectPool<SentryPolicyServiceClient> pool) throws Exception {
- Object result = null;
- SentryPolicyServiceClient client;
- try {
- // get the connection from the pool, don't know if the connection is broken.
- client = pool.borrowObject();
- } catch (Exception e) {
- LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
- // If the exception is caused by connection problem, throw the TTransportException
- // for reconnect.
- if (e instanceof IOException) {
- throw new TTransportException(e);
- }
- throw new SentryUserException(e.getMessage(), e);
- }
- try {
- // do the thrift call
- result = method.invoke(client, args);
- } catch (InvocationTargetException e) {
- // Get the target exception, check if SentryUserException or TTransportException is wrapped.
- // TTransportException or IOException means there has connection problem with the pool.
- Throwable targetException = e.getCause();
- if (targetException instanceof SentryUserException) {
- Throwable sentryTargetException = targetException.getCause();
- // If there has connection problem, eg, invalid connection if the service restarted,
- // sentryTargetException instanceof TTransportException or IOException = true.
- if (sentryTargetException instanceof TTransportException
- || sentryTargetException instanceof IOException) {
- // If the exception is caused by connection problem, destroy the instance and
- // remove it from the commons-pool. Throw the TTransportException for reconnect.
- pool.invalidateObject(client);
- throw new TTransportException(sentryTargetException);
- }
- // The exception is thrown by thrift call, eg, SentryAccessDeniedException.
- throw (SentryUserException) targetException;
- }
- throw e;
- } finally{
- try {
- // return the instance to commons-pool
- pool.returnObject(client);
- } catch (Exception e) {
- LOGGER.error(POOL_EXCEPTION_MESSAGE, e);
- throw e;
- }
- }
- return result;
- }
-
- @Override
- public void close() {
- for (int i = 0; i < endpoints.length; i++) {
- try {
- endpoints[i].getPool().close();
- } catch (Exception e) {
- LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
- }
- }
- }
-}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index 9beb07b..ec938da 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -313,7 +313,8 @@
hmsFollowerExecutor.scheduleAtFixedRate(hmsFollower,
initDelay, period, TimeUnit.MILLISECONDS);
} catch (IllegalArgumentException e) {
- LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms", period), e);
+ LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms",
+ period), e);
throw e;
}
}
@@ -381,7 +382,7 @@
sentryStoreCleanService.scheduleWithFixedDelay(
storeCleaner, 0, storeCleanPeriodSecs, TimeUnit.SECONDS);
- LOGGER.info("sentry store cleaner is scheduled with interval %d seconds", storeCleanPeriodSecs);
+ LOGGER.info("sentry store cleaner is scheduled with interval {} seconds", storeCleanPeriodSecs);
}
catch(IllegalArgumentException e){
LOGGER.error("Could not start SentryStoreCleaner due to illegal argument", e);
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
index 7db9310..f3aa587 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
@@ -19,36 +19,120 @@
package org.apache.sentry.service.thrift;
import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.ThreadSafe;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.sentry.core.common.utils.SentryConstants.KERBEROS_MODE;
+
+@ThreadSafe
public final class SentryServiceClientFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientFactory.class);
+
private static final SentryPolicyClientTransportConfig transportConfig =
new SentryPolicyClientTransportConfig();
+ private final Configuration conf;
+ private final SentryTransportPool transportPool;
- private SentryServiceClientFactory() {
+ private static final AtomicReference<SentryServiceClientFactory> clientFactory =
+ new AtomicReference<>();
+
+ /**
+ * Create a client instance. The supplied configuration is only used the first time and
+ * ignored afterwords. Tests that want to supply different configurations
+ * should call {@link #factoryReset(SentryServiceClientFactory)} to force new configuration
+ * read.
+ * @param conf Configuration
+ * @return client instance
+ * @throws Exception
+ */
+ public static SentryPolicyServiceClient create(Configuration conf) throws Exception {
+ SentryServiceClientFactory factory = clientFactory.get();
+ if (factory != null) {
+ return factory.create();
+ }
+ factory = new SentryServiceClientFactory(conf);
+ boolean ok = clientFactory.compareAndSet(null, factory);
+ if (ok) {
+ return factory.create();
+ }
+ // Close old factory
+ factory.close();
+ return clientFactory.get().create();
}
- public static SentryPolicyServiceClient create(Configuration conf) throws Exception {
- boolean pooled = conf.getBoolean(
- ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT);
- if (pooled) {
- return (SentryPolicyServiceClient) Proxy
- .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
- SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
- new PoolClientInvocationHandler(conf));
- } else {
- return (SentryPolicyServiceClient) Proxy
- .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
- SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
- new RetryClientInvocationHandler(conf,
- new SentryPolicyServiceClientDefaultImpl(conf,transportConfig), transportConfig));
+ private SentryServiceClientFactory(Configuration conf) {
+ Configuration clientConf = conf;
+
+ // When kerberos is enabled, UserGroupInformation should have been initialized with
+ // HADOOP_SECURITY_AUTHENTICATION property. There are instances where this is not done.
+ // Instead of depending on the callers to update this configuration and to be
+ // sure that UserGroupInformation is properly initialized, sentry client is explicitly
+ // doing it.
+ //
+ // This whole piece of code is a bit ugly but we want to avoid doing this in the transport
+ // code during connection establishment, so we are doing it upfront here instead.
+ boolean useKerberos = transportConfig.isKerberosEnabled(conf);
+
+ if (useKerberos) {
+ LOGGER.info("Using Kerberos authentication");
+ String authMode = conf.get(HADOOP_SECURITY_AUTHENTICATION, "");
+ if (authMode != KERBEROS_MODE) {
+ // Force auth mode to be Kerberos
+ clientConf = new Configuration(conf);
+ clientConf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS_MODE);
+ }
+ }
+
+ this.conf = clientConf;
+
+ boolean useUGI = transportConfig.useUserGroupInformation(conf);
+
+ if (useUGI) {
+ LOGGER.info("Using UserGroupInformation authentication");
+ UserGroupInformation.setConfiguration(this.conf);
+ }
+
+ transportPool = new SentryTransportPool(conf, transportConfig,
+ new SentryTransportFactory(conf, transportConfig));
+ }
+
+ private SentryPolicyServiceClient create() throws Exception {
+ return (SentryPolicyServiceClient) Proxy
+ .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
+ SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
+ new RetryClientInvocationHandler(conf,
+ new SentryPolicyServiceClientDefaultImpl(conf, transportPool), transportConfig));
+ }
+
+ /**
+ * Reset existing factory and return the old one.
+ * Only used by tests.
+ * @param factory new factory to use. May be null.
+ * @return
+ */
+ public static SentryServiceClientFactory factoryReset(SentryServiceClientFactory factory) {
+ return clientFactory.getAndSet(factory);
+ }
+
+ void close() {
+ try {
+ transportPool.close();
+ } catch (Exception e) {
+ LOGGER.error("failed to close transport pool", e);
}
}
}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
deleted file mode 100644
index 0164fa6..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import org.apache.commons.pool2.BasePooledObjectFactory;
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SentryServiceClientPoolFactory is for connection pool to manage the object. Implement the related
- * method to create object, destroy object and wrap object.
- */
-
-public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<SentryPolicyServiceClient> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class);
-
- private final String addr;
- private final int port;
- private final Configuration conf;
-
- public SentryServiceClientPoolFactory(String addr, int port,
- Configuration conf) {
- this.addr = addr;
- this.port = port;
- this.conf = conf;
- }
-
- @Override
- public SentryPolicyServiceClient create() throws Exception {
- LOGGER.debug("Creating Sentry Service Client...");
- return new SentryPolicyServiceClientDefaultImpl(addr, port, conf);
- }
-
- @Override
- public PooledObject<SentryPolicyServiceClient> wrap(SentryPolicyServiceClient client) {
- return new DefaultPooledObject<SentryPolicyServiceClient>(client);
- }
-
- @Override
- public void destroyObject(PooledObject<SentryPolicyServiceClient> pooledObject) {
- SentryPolicyServiceClient client = pooledObject.getObject();
- LOGGER.debug("Destroying Sentry Service Client: " + client);
- if (client != null) {
- // The close() of TSocket or TSaslClientTransport is called actually, and there has no
- // exception even there has some problems, eg, the client is closed already.
- // The close here is just try to close the socket and the client will be destroyed soon.
- client.close();
- }
- }
-}
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
index a4dd8a6..32e67b9 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
@@ -44,6 +44,7 @@
runTestAsSubject(new TestOperation() {
@Override
public void runTestAsSubject() throws Exception {
+ SentryServiceClientFactory oldFactory = SentryServiceClientFactory.factoryReset(null);
Configuration confWithSmallMaxMsgSize = new Configuration(conf);
confWithSmallMaxMsgSize.setLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, 20);
// create a client with a small thrift max message size
@@ -63,6 +64,7 @@
} finally {
Assert.assertEquals(true, exceptionThrown);
clientWithSmallMaxMsgSize.close();
+ SentryServiceClientFactory.factoryReset(oldFactory);
}
// client can still talk with sentry server when message size is smaller.
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
deleted file mode 100644
index a202775..0000000
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import com.google.common.net.HostAndPort;
-import org.apache.sentry.core.common.utils.ThriftUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestPoolClientInvocationHandler {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(TestPoolClientInvocationHandler.class);
-
- private void expectParseHostPortStrings(String hostsAndPortsStr,
- String[] expectedHosts, int[] expectedPorts) throws Exception {
- boolean success = false;
- String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
- HostAndPort[] hostsAndPorts;
- try {
- hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, 8038);
- success = true;
- } finally {
- if (!success) {
- LOGGER.error("Caught exception while parsing hosts/ports string " +
- hostsAndPortsStr);
- }
- }
- String[] hosts = new String[hostsAndPortsStrArr.length];
- int[] ports = new int[hostsAndPortsStrArr.length];
- parseHostsAndPorts(hostsAndPorts, hosts, ports);
- Assert.assertArrayEquals("Got unexpected hosts results while " +
- "parsing " + hostsAndPortsStr, expectedHosts, hosts);
- Assert.assertArrayEquals("Got unexpected ports results while " +
- "parsing " + hostsAndPortsStr, expectedPorts, ports);
- }
-
- private void parseHostsAndPorts(HostAndPort[] hostsAndPorts, String[] hosts, int[] ports) {
- for (int i = 0; i < hostsAndPorts.length; i++) {
- hosts[i] = hostsAndPorts[i].getHostText();
- ports[i] = hostsAndPorts[i].getPort();
- }
- }
-
- @SuppressWarnings("PMD.AvoidUsingHardCodedIP")
- @Test
- public void testParseHostPortStrings() throws Exception {
- expectParseHostPortStrings("foo", new String[] {"foo"}, new int[] {8038});
- expectParseHostPortStrings("foo,bar",
- new String[] {"foo", "bar"},
- new int[] {8038, 8038});
- expectParseHostPortStrings("foo:2020,bar:2021",
- new String[] {"foo", "bar"},
- new int[] {2020, 2021});
- expectParseHostPortStrings("127.0.0.1:2020,127.1.0.1",
- new String[] {"127.0.0.1", "127.1.0.1"},
- new int[] {2020, 8038});
- expectParseHostPortStrings("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:433",
- new String[] {"2001:db8:85a3:8d3:1319:8a2e:370:7348"},
- new int[] {433});
- }
-}
diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
index bead003..7c45999 100644
--- a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
+++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
@@ -28,6 +28,7 @@
import org.apache.sentry.core.model.kafka.Host;
import org.apache.sentry.kafka.conf.KafkaAuthConf;
import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
+import org.apache.sentry.provider.db.generic.UpdatableCache;
import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
@@ -79,8 +80,12 @@
@BeforeClass
public static void beforeTestEndToEnd() throws Exception {
+ // Stop background update thread
+ UpdatableCache.disable();
setupConf();
startSentryServer();
+ // We started a new server, invalidate all connections to the old one
+ SentryGenericServiceClientFactory.factoryReset();
setUserGroups();
setAdminPrivilege();
startKafkaServer();
@@ -88,8 +93,10 @@
@AfterClass
public static void afterTestEndToEnd() throws Exception {
- stopSentryServer();
+ // Stop background update thread
+ UpdatableCache.disable();
stopKafkaServer();
+ stopSentryServer();
}
private static void stopKafkaServer() {
@@ -170,10 +177,8 @@
}
public static void setAdminPrivilege() throws Exception {
- SentryGenericServiceClient sentryClient = null;
- try {
- /** grant all privilege to admin user */
- sentryClient = getSentryClient();
+ try (SentryGenericServiceClient sentryClient = getSentryClient()){
+ // grant all privilege to admin user
sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT);
sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP));
final ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
@@ -184,14 +189,10 @@
sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT,
new TSentryPrivilege(COMPONENT, "kafka", authorizables,
KafkaActionConstant.ALL));
- } finally {
- if (sentryClient != null) {
- sentryClient.close();
- }
}
}
- protected static SentryGenericServiceClient getSentryClient() throws Exception {
+ static SentryGenericServiceClient getSentryClient() throws Exception {
return SentryGenericServiceClientFactory.create(getClientConfig());
}
diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
index 0b1ef68..6d2cabf 100644
--- a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
+++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
@@ -34,6 +34,7 @@
import org.apache.sentry.core.model.kafka.Host;
import org.apache.sentry.core.model.kafka.Topic;
import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
import org.junit.Assert;
@@ -55,6 +56,8 @@
@Test
public void testProduceConsumeForSuperuser() {
+ LOGGER.debug("testProduceConsumeForSuperuser");
+ SentryGenericServiceClientFactory.factoryReset();
try {
final String SuperuserName = "test";
testProduce(SuperuserName);
@@ -66,8 +69,11 @@
@Test
public void testProduceConsumeCycle() throws Exception {
+ LOGGER.debug("testProduceConsumeCycle");
final String localhost = InetAddress.getLocalHost().getHostAddress();
+ // SentryGenericServiceClientFactory.factoryReset();
+
// START TESTING PRODUCER
try {
testProduce("user1");
diff --git a/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java b/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
index 8a01e1c..80f158a 100644
--- a/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
+++ b/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
@@ -197,19 +197,14 @@
}
public static void setAdminPrivilege() throws Exception {
- SentryGenericServiceClient sentryClient = null;
- try {
- /** grant all privilege to admin user */
- sentryClient = SentryGenericServiceClientFactory.create(getClientConfig());
+ try (SentryGenericServiceClient sentryClient =
+ SentryGenericServiceClientFactory.create(getClientConfig())){
+ // grant all privilege to admin user
sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT);
sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP));
sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT,
new TSentryPrivilege(COMPONENT, SQOOP_SERVER_NAME, new ArrayList<TAuthorizable>(),
SqoopActionConstant.ALL));
- } finally {
- if (sentryClient != null) {
- sentryClient.close();
- }
}
}