SENTRY-1580: Provide pooled client connection model with HA
diff --git a/pom.xml b/pom.xml
index ad54cfd..ba13a0d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
     <build.helper.maven.plugin.version>1.8</build.helper.maven.plugin.version>
     <cglib.version>2.2</cglib.version>
     <commons-cli.version>1.2</commons-cli.version>
-    <commons-pool2.version>2.2</commons-pool2.version>
+    <commons-pool2.version>2.4.2</commons-pool2.version>
     <commons.lang.version>2.6</commons.lang.version>
     <commons.logging.version>1.2</commons.logging.version>
     <curator.version>2.11.1</curator.version>
diff --git a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java
index a3140f2..8a5085b 100644
--- a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java
+++ b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java
@@ -291,23 +291,27 @@
     Map<String, Map<String, Set<String>>> policyFileMappingData = sentryPolicyFileFormatter.parse(
         importPolicyFilePath, authzConf);
     // todo: here should be an validator to check the data's value, format, hierarchy
-    SentryPolicyServiceClient client = SentryServiceClientFactory.create(getAuthzConf());
-    // import the mapping data to database
-    client.importPolicy(policyFileMappingData, requestorUserName, importOverwriteRole);
+    try(SentryPolicyServiceClient client =
+                SentryServiceClientFactory.create(getAuthzConf())) {
+      // import the mapping data to database
+      client.importPolicy(policyFileMappingData, requestorUserName, importOverwriteRole);
+    }
   }
 
   // export the sentry mapping data to file
   public void exportPolicy() throws Exception {
     String requestorUserName = System.getProperty("user.name", "");
-    SentryPolicyServiceClient client = SentryServiceClientFactory.create(getAuthzConf());
-    // export the sentry mapping data from database to map structure
-    Map<String, Map<String, Set<String>>> policyFileMappingData = client
-        .exportPolicy(requestorUserName, objectPath);
-    // get the FileFormatter according to the configuration
-    SentryPolicyFileFormatter sentryPolicyFileFormatter = SentryPolicyFileFormatFactory
-        .createFileFormatter(authzConf);
-    // write the sentry mapping data to exportPolicyFilePath with the data in map structure
-    sentryPolicyFileFormatter.write(exportPolicyFilePath, policyFileMappingData);
+    try (SentryPolicyServiceClient client =
+                SentryServiceClientFactory.create(getAuthzConf())) {
+      // export the sentry mapping data from database to map structure
+      Map<String, Map<String, Set<String>>> policyFileMappingData = client
+              .exportPolicy(requestorUserName, objectPath);
+      // get the FileFormatter according to the configuration
+      SentryPolicyFileFormatter sentryPolicyFileFormatter = SentryPolicyFileFormatFactory
+              .createFileFormatter(authzConf);
+      // write the sentry mapping data to exportPolicyFilePath with the data in map structure
+      sentryPolicyFileFormatter.write(exportPolicyFilePath, policyFileMappingData);
+    }
   }
 
   // list permissions for given user
diff --git a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java
index 262db11..2abdd53 100644
--- a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java
+++ b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java
@@ -356,11 +356,11 @@
       throws SentryUserException, IOException, MetaException {
     String requestorUserName = UserGroupInformation.getCurrentUser()
         .getShortUserName();
-    SentryPolicyServiceClient sentryClient = getSentryServiceClient();
-    sentryClient.dropPrivileges(requestorUserName, authorizableTable);
-
-    // Close the connection after dropping privileges is done.
-    sentryClient.close();
+    try(SentryPolicyServiceClient sentryClient = getSentryServiceClient()) {
+      sentryClient.dropPrivileges(requestorUserName, authorizableTable);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
   }
 
   private void renameSentryTablePrivilege(String oldDbName, String oldTabName,
@@ -379,9 +379,7 @@
     if (!oldTabName.equalsIgnoreCase(newTabName)
         && syncWithPolicyStore(AuthzConfVars.AUTHZ_SYNC_ALTER_WITH_POLICY_STORE)) {
 
-      SentryPolicyServiceClient sentryClient = getSentryServiceClient();
-
-      try {
+      try (SentryPolicyServiceClient sentryClient = getSentryServiceClient()){
         String requestorUserName = UserGroupInformation.getCurrentUser()
             .getShortUserName();
         sentryClient.renamePrivileges(requestorUserName, oldAuthorizableTable, newAuthorizableTable);
@@ -392,10 +390,8 @@
             + " Error: " + e.getMessage());
       } catch (IOException e) {
         throw new MetaException("Failed to find local user " + e.getMessage());
-      } finally {
-
-        // Close the connection after renaming privileges is done.
-        sentryClient.close();
+      } catch (Exception e) {
+        e.printStackTrace();
       }
     }
     // The HDFS plugin needs to know if it's a path change (set location)
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java
index da587d0..3f06cae 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java
@@ -98,7 +98,6 @@
   private static final int terminator = Utilities.newLineCode;
   private static final long serialVersionUID = -7625118066790571999L;
 
-  private SentryPolicyServiceClient sentryClient;
   private HiveConf conf;
   private HiveAuthzBinding hiveAuthzBinding;
   private HiveAuthzConf authzConf;
@@ -116,13 +115,8 @@
 
   @Override
   public int execute(DriverContext driverContext) {
-    try {
-      try {
-        this.sentryClient = SentryServiceClientFactory.create(authzConf);
-      } catch (Exception e) {
-        String msg = "Error creating Sentry client: " + e.getMessage();
-        throw new RuntimeException(msg, e);
-      }
+    try (SentryPolicyServiceClient sentryClient =
+                 SentryServiceClientFactory.create(authzConf)) {
       Preconditions.checkNotNull(hiveAuthzBinding, "HiveAuthzBinding cannot be null");
       Preconditions.checkNotNull(authzConf, "HiveAuthConf cannot be null");
       Preconditions.checkNotNull(subject, "Subject cannot be null");
@@ -179,9 +173,6 @@
       console.printError(msg);
       return RETURN_CODE_FAILURE;
     } finally {
-      if (sentryClient != null) {
-        sentryClient.close();
-      }
       if (hiveAuthzBinding != null) {
         hiveAuthzBinding.close();
       }
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
index fdb6df4..3ec2eed 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
@@ -364,11 +364,12 @@
       throws SentryUserException, IOException, MetaException {
     String requestorUserName = UserGroupInformation.getCurrentUser()
         .getShortUserName();
-    SentryPolicyServiceClient sentryClient = getSentryServiceClient();
-    sentryClient.dropPrivileges(requestorUserName, authorizableTable);
-
-    // Close the connection after dropping privileges is done.
-    sentryClient.close();
+    try (SentryPolicyServiceClient sentryClient = SentryServiceClientFactory.create(authzConf)) {
+      sentryClient.dropPrivileges(requestorUserName, authorizableTable);
+    } catch (Exception e) {
+      throw new MetaException("Failed to connect to Sentry service "
+              + e.getMessage());
+    }
   }
 
   private void renameSentryTablePrivilege(String oldDbName, String oldTabName,
@@ -403,7 +404,11 @@
       } finally {
 
         // Close the connection after renaming privileges is done.
-        sentryClient.close();
+        try {
+          sentryClient.close();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
       }
     }
     // The HDFS plugin needs to know if it's a path change (set location)
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index 4851114..f5d4431 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -365,9 +365,7 @@
   }
 
   private <T> T execute(Command<T> cmd) throws KafkaException {
-    SentryGenericServiceClient client = null;
-    try {
-      client = getClient();
+    try (SentryGenericServiceClient client  = getClient()){
       return cmd.run(client);
     } catch (SentryUserException ex) {
       String msg = "Unable to excute command on sentry server: " + ex.getMessage();
@@ -377,10 +375,6 @@
       String msg = "Unable to obtain client:" + ex.getMessage();
       LOG.error(msg, ex);
       throw new KafkaException(msg, ex);
-    } finally {
-      if (client != null) {
-        client.close();
-      }
     }
   }
 
diff --git a/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java b/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java
index 2400673..37adb56 100644
--- a/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java
+++ b/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java
@@ -298,9 +298,7 @@
     if (!isSyncEnabled()) {
       return;
     }
-    SentryGenericServiceClient client = null;
-    try {
-      client = getClient();
+    try (SentryGenericServiceClient client = getClient()) {
       TSentryPrivilege tPrivilege = new TSentryPrivilege();
       tPrivilege.setComponent(AuthorizationComponent.Search);
       tPrivilege.setServiceName(authzConf.get(SENTRY_SEARCH_SERVICE_KEY,
@@ -316,10 +314,6 @@
           " can't delete privileges for collection " + collection);
     } catch (Exception ex) {
       throw new SentrySolrAuthorizationException("Unable to obtain client:" + ex.getMessage());
-    } finally {
-      if (client != null) {
-        client.close();
-      }
     }
   }
 }
diff --git a/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java b/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java
index 84a61cc..11e2aa4 100644
--- a/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java
+++ b/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java
@@ -408,9 +408,7 @@
   }
 
   private <T> T execute(Command<T> cmd) throws SqoopException {
-    SentryGenericServiceClient client = null;
-    try {
-      client = getClient();
+    try (SentryGenericServiceClient client = getClient()){
       return cmd.run(client);
     } catch (SentryUserException ex) {
       String msg = "Unable to excute command on sentry server: " + ex.getMessage();
@@ -420,10 +418,6 @@
       String msg = "Unable to obtain client:" + ex.getMessage();
       LOG.error(msg, ex);
       throw new SqoopException(SecurityError.AUTH_0014, msg, ex);
-    } finally {
-      if (client != null) {
-        client.close();
-      }
     }
   }
 }
diff --git a/sentry-core/sentry-core-common/pom.xml b/sentry-core/sentry-core-common/pom.xml
index e1be256..2e7f5c5 100644
--- a/sentry-core/sentry-core-common/pom.xml
+++ b/sentry-core/sentry-core-common/pom.xml
@@ -66,6 +66,10 @@
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
     </dependency>
+      <dependency>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-pool2</artifactId>
+      </dependency>
   </dependencies>
 
   <build>
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
index 34a594e..112f050 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
@@ -25,7 +25,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
@@ -46,17 +45,17 @@
  * <p>
  */
 
-public class RetryClientInvocationHandler extends SentryClientInvocationHandler {
+public final class RetryClientInvocationHandler extends SentryClientInvocationHandler {
   private static final Logger LOGGER =
     LoggerFactory.getLogger(RetryClientInvocationHandler.class);
-  private SentryServiceClient client = null;
+  private SentryConnection client = null;
   private final int maxRetryCount;
 
   /**
    * Initialize the sentry configurations, including rpc retry count and client connection
    * configs for SentryPolicyServiceClientDefaultImpl
    */
-  public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject,
+  public RetryClientInvocationHandler(Configuration conf, SentryConnection clientObject,
                                       SentryClientTransportConfigInterface transportConfig) {
     Preconditions.checkNotNull(conf, "Configuration object cannot be null");
     Preconditions.checkNotNull(clientObject, "Client Object cannot be null");
@@ -72,76 +71,78 @@
    * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException
    * if failed retry after rpcRetryTotal times.
    * if it is failed with other exception, method would just re-throw the exception.
-   * Synchronized it for thread safety.
    */
   @Override
   public synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
-    int retryCount = 0;
     Exception lastExc = null;
 
-    while (retryCount < maxRetryCount) {
-      // Connect to a sentry server if not connected yet.
-      try {
-        client.connect();
-      } catch (IOException e) {
-        // Increase the retry num
-        // Retry when the exception is caused by connection problem.
-        retryCount++;
-        lastExc = e;
-        close();
-        continue;
-      }
+    for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
+      connect();
 
       // do the thrift call
       try {
+        LOGGER.debug("Calling client {}", method.getName());
         return method.invoke(client, args);
       } catch (InvocationTargetException e) {
         // Get the target exception, check if SentryUserException or TTransportException is wrapped.
         // TTransportException means there is a connection problem.
+        LOGGER.error("failed to execute {}", method.getName(), e);
         Throwable targetException = e.getCause();
-        if (targetException instanceof SentryUserException ||
-          targetException instanceof SentryHdfsServiceException) {
-          Throwable sentryTargetException = targetException.getCause();
-          // If there has connection problem, eg, invalid connection if the service restarted,
-          // sentryTargetException instanceof TTransportException will be true.
-          if (sentryTargetException instanceof TTransportException) {
-            // Retry when the exception is caused by connection problem.
-            lastExc = new TTransportException(sentryTargetException);
-            LOGGER.error("Thrift call failed with TTransportException", lastExc);
-            // Closing the thrift client on TTransportException. New client object is
-            // created using new socket when an attempt to reconnect is made.
-            close();
-          } else {
-            // The exception is thrown by thrift call, eg, SentryAccessDeniedException.
-            // Do not need to reconnect to the sentry server.
-            if (targetException instanceof SentryUserException) {
-              throw (SentryUserException) targetException;
-            } else {
-              throw (SentryHdfsServiceException) targetException;
-            }
-          }
-        } else {
+        if (!((targetException instanceof SentryUserException) ||
+            (targetException instanceof SentryHdfsServiceException))) {
           throw e;
         }
+        Throwable sentryTargetException = targetException.getCause();
+        // If there has connection problem, eg, invalid connection if the service restarted,
+        // sentryTargetException instanceof TTransportException will be true.
+        if (sentryTargetException instanceof TTransportException) {
+          // Retry when the exception is caused by connection problem.
+          lastExc = new TTransportException(sentryTargetException);
+          LOGGER.error("Thrift call failed", lastExc);
+          // The connection to the server is bad, inform the client of the problem
+          client.invalidate();
+        } else {
+          // Semantic exception which does not indicate the connection failure.
+          // Do not need to reconnect to the sentry server.
+          if (targetException instanceof SentryUserException) {
+            throw (SentryUserException) targetException;
+          } else {
+            throw (SentryHdfsServiceException) targetException;
+          }
+        }
       }
-
-      // Increase the retry num
-      retryCount++;
     }
 
     // Throw the exception as reaching the max rpc retry num.
     String error = String.format("Request failed, %d retries attempted ", maxRetryCount);
-    LOGGER.error(error, lastExc);
     throw new SentryUserException(error, lastExc);
   }
 
+  /**
+   * Connect the client, retry multiple times
+   * @throws Exception
+   */
+  private void connect() throws Exception {
+    Exception lastExc = null;
+    for (int retryCount = 0;  retryCount < maxRetryCount; retryCount++) {
+      try {
+        client.connect();
+        return;
+      } catch (Exception e) {
+        // Increase the retry num
+        // Retry when the exception is caused by connection problem.
+        LOGGER.error("failed to connect", e);
+        retryCount++;
+        lastExc = e;
+      }
+    }
+    assert lastExc != null;
+    throw lastExc;
+  }
+
   @Override
   public synchronized void close() {
-    try {
-      LOGGER.debug("Releasing the current client connection");
-      client.disconnect();
-    } catch (Exception e) {
-      LOGGER.error("Encountered failure while closing the connection");
-    }
+    //We are done with this client
+    client.done();
   }
 }
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
index 9ea7185..ad45305 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,7 @@
  */
 package org.apache.sentry.core.common.transport;
 
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.exception.MissingConfigurationException;
 
@@ -29,14 +30,6 @@
  * the transport configuration.
  */
 interface SentryClientTransportConfigInterface {
-  /**
-   * @param conf configuration
-   * @return number of times client retry logic should iterate through all
-   * the servers before giving up.
-   * @throws MissingConfigurationException if property is mandatory and is missing in
-   *                                       configuration.
-   */
-  int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException;
 
   /**
    * @param conf configuration
@@ -52,7 +45,7 @@
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException;
+  boolean isKerberosEnabled(Configuration conf);
 
   /**
    * @param conf configuration
@@ -61,7 +54,7 @@
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  boolean useUserGroupInformation(Configuration conf) throws MissingConfigurationException;
+  boolean useUserGroupInformation(Configuration conf);
 
   /**
    * @param conf configuration
@@ -69,7 +62,7 @@
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  String getSentryPrincipal(Configuration conf) throws MissingConfigurationException;
+  String getSentryPrincipal(Configuration conf);
 
   /**
    * Port in RPC Addresses configured is optional
@@ -78,7 +71,7 @@
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  String getSentryServerRpcAddress(Configuration conf) throws MissingConfigurationException;
+  String getSentryServerRpcAddress(Configuration conf);
 
   /**
    * Port in RPC Addresses configured is optional. If a port is not provided for a server
@@ -88,7 +81,7 @@
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  int getServerRpcPort(Configuration conf) throws MissingConfigurationException;
+  int getServerRpcPort(Configuration conf);
 
   /**
    * @param conf configuration
@@ -99,14 +92,59 @@
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  int getServerRpcConnTimeoutInMs(Configuration conf) throws MissingConfigurationException;
+  int getServerRpcConnTimeoutInMs(Configuration conf);
 
   /**
-   *
+   * Maximum number of connections in the pool.
+   * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMaxTotal(int)}
    * @param conf configuration
-   * @return True if the client should load balance connections between multiple servers
-   * @throws MissingConfigurationException if property is mandatory and is missing in
-   *                                       configuration.
+   * @return maximum number of connection objects in the pool
    */
-   boolean isLoadBalancingEnabled(Configuration conf)throws MissingConfigurationException;
+  int getPoolMaxTotal(Configuration conf);
+
+  /**
+   * Minimum number of idle obects on the pool.
+   * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMinIdle(int)}
+   * @param conf Configuration
+   * @return Minimum idle connections to keep in the pool
+   */
+  int getPoolMinIdle(Configuration conf);
+
+  /**
+   * Maximum number of idle connections in the pool.
+   * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMaxIdle(int)}
+   * @param conf Configuration
+   * @return Maximum number of idle connections in the pool
+   */
+  int getPoolMaxIdle(Configuration conf);
+
+  /**
+   * This is the minimum amount of time an object may sit idle in the pool
+   * before it is eligible for eviction.
+   * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMinEvictableIdleTimeMillis}
+   * @param conf Configuration
+   * @return The value for the pool minimum eviction time.
+   */
+  long getMinEvictableTimeSec(Configuration conf);
+
+  /**
+   * The number of seconds to sleep between runs of the idle object evictor thread.
+   * When non-positive, no idle object evictor thread will be run.
+   * See {@link GenericObjectPoolConfig#getTimeBetweenEvictionRunsMillis()}
+   * @param conf Configuration
+   * @return The number of seconds to sleep between runs of the idle object evictor thread.
+   */
+  long getTimeBetweenEvictionRunsSec(Configuration conf);
+
+  /**
+   * @param conf configuration
+   * @return True if using load-balancing between Sentry servers
+   */
+  boolean isLoadBalancingEnabled(Configuration conf);
+
+  /**
+   * @param conf configuration
+   * @return true if transport pools are enabled
+   */
+  boolean isTransportPoolEnabled(Configuration conf);
 }
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
index 651173e..4caa6b6 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
@@ -19,6 +19,8 @@
 package org.apache.sentry.core.common.transport;
 
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Defines configuration strings needed for sentry thrift clients to handle the transport level
  * operations.
@@ -27,7 +29,7 @@
  * Clients that needs these configuration string use the implementations of interface
  * <code>SentryClientTransportConfigInterface</code>.
  */
-class SentryClientTransportConstants {
+public final class SentryClientTransportConstants {
 
   /**
    * max retry num for client rpc
@@ -44,8 +46,18 @@
     "sentry.service.client.connection.full.retry-total";
   static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = 2;
 
+  /**
+   * Enable load balancing between servers
+   */
+  static final String SENTRY_CLIENT_LOAD_BALANCING =
+          "sentry.service.client.connection.loadbalance";
+  static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true;
+
   static final int RPC_PORT_DEFAULT = 8038;
 
+  private SentryClientTransportConstants() {
+  }
+
   /**
    * Defines configuration strings needed for sentry thrift policy clients to handle
    * the transport level operations.
@@ -57,7 +69,6 @@
     //configuration for server address. It can be coma seperated list of server addresses.
     static final String SERVER_RPC_ADDRESS = "sentry.service.client.server.rpc-address";
 
-
     /**
      * This configuration parameter is only meant to be used for testing purposes.
      */
@@ -72,7 +83,7 @@
     static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT =
       SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL_DEFAULT;
 
-    static final String SECURITY_USE_UGI_TRANSPORT = "sentry.service.security.use.ugi";
+    public static final String SECURITY_USE_UGI_TRANSPORT = "sentry.service.security.use.ugi";
     static final String PRINCIPAL = "sentry.service.server.principal";
 
     //configration for the client connection timeout.
@@ -88,35 +99,39 @@
     static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total";
     static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3;
 
-    // connection pool configuration
-    static final String SENTRY_POOL_ENABLED = "sentry.service.client.connection.pool.enabled";
-    static final boolean SENTRY_POOL_ENABLED_DEFAULT = false;
+    // commons-pool configuration
+    static final String SENTRY_POOL_ENABLE = "sentry.service.client.connection.pool.enabled";
+    static final boolean SENTRY_POOL_ENABLE_DEFAULT = true;
 
-    // commons-pool configuration for pool size
+    /** Allow unlimited number of idle connections */
     static final String SENTRY_POOL_MAX_TOTAL = "sentry.service.client.connection.pool.max-total";
-    static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = 8;
+    static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = -1;
     static final String SENTRY_POOL_MAX_IDLE = "sentry.service.client.connection.pool.max-idle";
-    static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 8;
+    static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 256;
     static final String SENTRY_POOL_MIN_IDLE = "sentry.service.client.connection.pool.min-idle";
-    static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 0;
+    static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 16;
+    static final String SENTRY_POOL_MIN_EVICTION_TIME_SEC =
+            "sentry.service.client.connection.pool.eviction.mintime.sec";
+    // 2 minutes seconds min time before eviction
+    static final long SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT =
+            TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);;
+    static final String SENTRY_POOL_EVICTION_INTERVAL_SEC =
+            "sentry.service.client.connection.pool.eviction.interval.sec";
+    // Run eviction thread every minute
+    static final long SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT =
+            TimeUnit.MILLISECONDS.convert(1L, TimeUnit.MINUTES);
 
-    // configuration to load balance the connections to the configured sentry servers
-    static final String SENTRY_CLIENT_LOAD_BALANCING = "sentry.service.client.connection.loadbalance";
-    static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true;
-
-    // retry num for getting the connection from connection pool
-    static final String SENTRY_POOL_RETRY_TOTAL =
-      SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL;
-    static final int SENTRY_POOL_RETRY_TOTAL_DEFAULT =
-      SentryClientTransportConstants.SENTRY_RPC_RETRY_TOTAL_DEFAULT;
-
+    static final String SENTRY_CLIENT_LOAD_BALANCING =
+            SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING;
+    static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT =
+            SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING_DEFAULT;
   }
 
   /**
    * Defines configuration strings needed for sentry HDFS clients to handle the transport level
    * operations.
    */
-  static class HDFSClientConstants {
+  public static class HDFSClientConstants {
 
     //Default server port
     static final int SERVER_RPC_PORT_DEFAULT = SentryClientTransportConstants.RPC_PORT_DEFAULT;
@@ -142,14 +157,10 @@
     static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT =
       SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL_DEFAULT;
 
-    static final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi";
+    public static final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi";
 
     static final String PRINCIPAL = "sentry.hdfs.service.server.principal";
 
-    static final String RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address";
-
-    static final String RPC_ADDRESS_DEFAULT = "0.0.0.0"; //NOPMD
-
     //configration for the client connection timeout.
     static final String SERVER_RPC_CONN_TIMEOUT =
       "sentry.hdfs.service.client.server.rpc-connection-timeout";
@@ -165,8 +176,28 @@
 
     static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3;
 
-    // configuration to load balance the connections to the configured sentry servers
-    static final String SENTRY_CLIENT_LOAD_BALANCING = "sentry.hdfs.service.client.connection.loadbalance";
-    static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true;
+    // commons-pool configuration - disable pool for HDFS clients
+    static final String SENTRY_POOL_ENABLE = "sentry.hdfs.service.client.connection.pool.enable";
+    static final boolean SENTRY_POOL_ENABLE_DEFAULT = false;
+
+    /** Total maximum number of open connections. There shouldn't be many. */
+    static final String SENTRY_POOL_MAX_TOTAL = "sentry.hdfs.service.client.connection.pool.max-total";
+    static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = 16;
+    /** Maximum number of idle connections to keep */
+    static final String SENTRY_POOL_MAX_IDLE = "sentry.hdfs.service.client.connection.pool.max-idle";
+    static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 2;
+    static final String SENTRY_POOL_MIN_IDLE = "sentry.hdfs.service.client.connection.pool.min-idle";
+    static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 1;
+    static final String SENTRY_POOL_MIN_EVICTION_TIME_SEC =
+            "sentry.hdfs.service.client.connection.pool.eviction.mintime.sec";
+    // No evictions for HDFS connections by default
+    static final long SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT = 0L;
+    static final String SENTRY_POOL_EVICTION_INTERVAL_SEC =
+            "sentry.hdfs.service.client.connection.pool.eviction.interval.sec";
+    static final long SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT = -1L;
+    static final String SENTRY_CLIENT_LOAD_BALANCING =
+            SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING;
+    static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT =
+            SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING_DEFAULT;
   }
 }
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java
new file mode 100644
index 0000000..b5f2bcf
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.transport;
+
+/**
+ * Representation of a connection to a Sentry Server.
+ * <ul>
+ *   <li>Connection is initialized using the {@link #connect()} method.</li>
+ *   <li>When the connection is no longer used, the {@link #done()} method should be called to
+ * deallocate any resources.</li>
+ * <li>If the user detected that connection is broken, they should call
+ * {@link #invalidate()} method. The connection can not be used after that.</li>
+ * </ul>
+ */
+public interface SentryConnection {
+  /**
+   * Connect to Sentry server.
+   * Either creates a new connection or reuses an existing one.
+   * @throws Exception on failure to connect.
+   */
+  void connect() throws Exception;
+
+  /**
+   * Disconnect from the server. May close connection or return it to a
+   * pool for reuse.
+   */
+  void done();
+
+  /**
+   * The connection is assumed to be non-working, invalidate it.
+   * Subsequent {@link #connect() call} should attempt to obtain
+   * another connection.
+   * <p>
+   * The implementation may attempt to connect
+   * to another server immediately or delay it till the call to
+   * {@link #connect()}.
+   */
+  void invalidate();
+}
\ No newline at end of file
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
index 2d80827..1724e7f 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
@@ -32,63 +32,92 @@
  */
 public final class SentryHDFSClientTransportConfig
   implements SentryClientTransportConfigInterface {
-  public SentryHDFSClientTransportConfig() { }
 
   @Override
-  public boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException {
+  public boolean isKerberosEnabled(Configuration conf) {
     return (conf.get(SECURITY_MODE, SentryConstants.KERBEROS_MODE).trim()
       .equalsIgnoreCase((SentryConstants.KERBEROS_MODE)));
   }
 
   @Override
-  public int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException {
-    return conf.getInt(SENTRY_FULL_RETRY_TOTAL, SENTRY_FULL_RETRY_TOTAL_DEFAULT);
-  }
-
-  @Override
   public int getSentryRpcRetryTotal(Configuration conf) {
     return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT);
   }
 
   @Override
-  public boolean useUserGroupInformation(Configuration conf)
-    throws MissingConfigurationException {
-    return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
+  public boolean useUserGroupInformation(Configuration conf) {
+    return Boolean.parseBoolean(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
   }
 
+  /**
+   * @throws MissingConfigurationException
+   */
   @Override
-  public String getSentryPrincipal(Configuration conf) throws MissingConfigurationException {
+  public String getSentryPrincipal(Configuration conf) {
     String principle = conf.get(PRINCIPAL);
-    if (principle != null && !principle.isEmpty()) {
+    if ((principle != null) && !principle.isEmpty()) {
       return principle;
     }
     throw new MissingConfigurationException(PRINCIPAL);
   }
 
+  /**
+   * @throws MissingConfigurationException
+   */
   @Override
-  public String getSentryServerRpcAddress(Configuration conf)
-    throws MissingConfigurationException {
+  public String getSentryServerRpcAddress(Configuration conf) {
     String serverAddress = conf.get(SERVER_RPC_ADDRESS);
-    if (serverAddress != null && !serverAddress.isEmpty()) {
+    if ((serverAddress != null) && !serverAddress.isEmpty()) {
       return serverAddress;
     }
     throw new MissingConfigurationException(SERVER_RPC_ADDRESS);
   }
 
   @Override
-  public int getServerRpcPort(Configuration conf) throws MissingConfigurationException {
+  public int getServerRpcPort(Configuration conf) {
     return conf.getInt(SERVER_RPC_PORT, SentryClientTransportConstants.RPC_PORT_DEFAULT);
   }
 
   @Override
-  public int getServerRpcConnTimeoutInMs(Configuration conf)
-    throws MissingConfigurationException {
+  public int getServerRpcConnTimeoutInMs(Configuration conf) {
     return conf.getInt(SERVER_RPC_CONN_TIMEOUT, SERVER_RPC_CONN_TIMEOUT_DEFAULT);
   }
 
   @Override
-  public boolean isLoadBalancingEnabled(Configuration conf)
-    throws MissingConfigurationException {
-    return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING, SENTRY_CLIENT_LOAD_BALANCING_DEFAULT);
+  public int getPoolMaxTotal(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MAX_TOTAL, SENTRY_POOL_MAX_TOTAL_DEFAULT);
+  }
+
+  @Override
+  public int getPoolMinIdle(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MIN_IDLE, SENTRY_POOL_MIN_IDLE_DEFAULT);
+  }
+
+  @Override
+  public int getPoolMaxIdle(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MAX_IDLE, SENTRY_POOL_MAX_IDLE_DEFAULT);
+  }
+
+  @Override
+  public long getMinEvictableTimeSec(Configuration conf) {
+    return conf.getLong(SENTRY_POOL_MIN_EVICTION_TIME_SEC,
+            SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT);
+  }
+
+  @Override
+  public long getTimeBetweenEvictionRunsSec(Configuration conf) {
+    return conf.getLong(SENTRY_POOL_EVICTION_INTERVAL_SEC,
+            SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT);
+  }
+
+  @Override
+  public boolean isLoadBalancingEnabled(Configuration conf) {
+    return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING,
+            SENTRY_CLIENT_LOAD_BALANCING_DEFAULT);
+  }
+
+  @Override
+  public boolean isTransportPoolEnabled(Configuration conf) {
+    return conf.getBoolean(SENTRY_POOL_ENABLE, SENTRY_POOL_ENABLE_DEFAULT);
   }
 }
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
index c97fe97..45522df 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.exception.MissingConfigurationException;
 import org.apache.sentry.core.common.utils.SentryConstants;
+
 import static org.apache.sentry.core.common.transport.SentryClientTransportConstants.PolicyClientConstants.*;
 
 /**
@@ -32,63 +33,91 @@
  */
 public final class SentryPolicyClientTransportConfig
   implements SentryClientTransportConfigInterface {
-  public SentryPolicyClientTransportConfig() { }
 
   @Override
-  public boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException {
+  public boolean isKerberosEnabled(Configuration conf) {
     return (conf.get(SECURITY_MODE, SentryConstants.KERBEROS_MODE).trim()
       .equalsIgnoreCase((SentryConstants.KERBEROS_MODE)));
   }
 
   @Override
-  public int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException {
-    return conf.getInt(SENTRY_FULL_RETRY_TOTAL, SENTRY_FULL_RETRY_TOTAL_DEFAULT);
-  }
-
-  @Override
   public int getSentryRpcRetryTotal(Configuration conf) {
     return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT);
   }
 
   @Override
-  public boolean useUserGroupInformation(Configuration conf)
-    throws MissingConfigurationException {
+  public boolean useUserGroupInformation(Configuration conf) {
     return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
   }
 
+  /**
+   * @throws MissingConfigurationException
+   */
   @Override
-  public String getSentryPrincipal(Configuration conf) throws MissingConfigurationException {
+  public String getSentryPrincipal(Configuration conf) {
     String principle = conf.get(PRINCIPAL);
-    if (principle != null && !principle.isEmpty()) {
+    if ((principle != null) && !principle.isEmpty()) {
       return principle;
     }
     throw new MissingConfigurationException(PRINCIPAL);
   }
 
+  /**
+   * @throws MissingConfigurationException
+   */
   @Override
-  public String getSentryServerRpcAddress(Configuration conf)
-    throws MissingConfigurationException {
+  public String getSentryServerRpcAddress(Configuration conf) {
     String serverAddress = conf.get(SERVER_RPC_ADDRESS);
-    if (serverAddress != null && !serverAddress.isEmpty()) {
+    if ((serverAddress != null) && !serverAddress.isEmpty()) {
       return serverAddress;
     }
     throw new MissingConfigurationException(SERVER_RPC_ADDRESS);
   }
 
   @Override
-  public int getServerRpcPort(Configuration conf) throws MissingConfigurationException {
+  public int getServerRpcPort(Configuration conf) {
     return conf.getInt(SERVER_RPC_PORT, SentryClientTransportConstants.RPC_PORT_DEFAULT);
   }
 
   @Override
-  public int getServerRpcConnTimeoutInMs(Configuration conf)
-    throws MissingConfigurationException {
+  public int getServerRpcConnTimeoutInMs(Configuration conf) {
     return conf.getInt(SERVER_RPC_CONN_TIMEOUT, SERVER_RPC_CONN_TIMEOUT_DEFAULT);
   }
 
   @Override
-  public boolean isLoadBalancingEnabled(Configuration conf)
-    throws MissingConfigurationException {
+  public int getPoolMaxTotal(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MAX_TOTAL, SENTRY_POOL_MAX_TOTAL_DEFAULT);
+  }
+
+  @Override
+  public int getPoolMinIdle(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MIN_IDLE, SENTRY_POOL_MIN_IDLE_DEFAULT);
+  }
+
+  @Override
+  public int getPoolMaxIdle(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MAX_IDLE, SENTRY_POOL_MAX_IDLE_DEFAULT);
+  }
+
+  @Override
+  public long getMinEvictableTimeSec(Configuration conf) {
+    return conf.getLong(SENTRY_POOL_MIN_EVICTION_TIME_SEC,
+            SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT);
+  }
+
+  @Override
+  public long getTimeBetweenEvictionRunsSec(Configuration conf) {
+    return conf.getLong(SENTRY_POOL_EVICTION_INTERVAL_SEC,
+            SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT);
+  }
+
+  @Override
+  public boolean isLoadBalancingEnabled(Configuration conf) {
     return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING, SENTRY_CLIENT_LOAD_BALANCING_DEFAULT);
   }
+
+  @Override
+  public boolean isTransportPoolEnabled(Configuration conf) {
+    return conf.getBoolean(SENTRY_POOL_ENABLE, SENTRY_POOL_ENABLE_DEFAULT);
+  }
 }
\ No newline at end of file
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
deleted file mode 100644
index 9a10ca5..0000000
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.core.common.transport;
-
-/**
- * Client interface for Proxy Invocation handlers
- * <p>
- * Defines interface that Sentry client's should expose to the Invocation handlers like
- * <code>RetryClientInvocationHandler</code> used to proxy the method invocation on sentry
- * client instances .
- * <p>
- * All the sentry clients that need retrying and failover capabilities should implement
- * this interface.
- */
-public interface SentryServiceClient {
-  /**
-   * Connect to Sentry server.
-   * Either creates a new connection or reuses an existing one.
-   * @throws Exception on failure to acquire a transport towards server.
-   */
-  void connect() throws Exception;
-
-  /**
-   * Disconnect from the server. May close connection or return it to a
-   * pool for reuse.
-   */
-  void disconnect();
-}
\ No newline at end of file
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
index 74aced2..b41bdfd 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,57 +22,136 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.net.HostAndPort;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.core.common.exception.MissingConfigurationException;
 import org.apache.thrift.transport.TSaslClientTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
-import org.apache.sentry.core.common.utils.ThriftUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.ThreadSafe;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
 
 /**
- * Create Thrift transports suitable for talking to Sentry
+ * Factory for producing connected Thrift transports.
+ * It can produce regular transports as well as Kerberos-enabled transports.
+ * <p>
+ * This class is immutable and thus thread-safe.
  */
-
-public class SentryTransportFactory {
-  protected final Configuration conf;
-  private String[] serverPrincipalParts;
-  protected TTransport thriftTransport;
-  private final int connectionTimeout;
+@ThreadSafe
+public final class SentryTransportFactory implements TransportFactory {
   private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class);
-  // configs for connection retry
-  private final int connectionFullRetryTotal;
-  private final ArrayList<InetSocketAddress> endpoints;
-  private final SentryClientTransportConfigInterface transportConfig;
+
+  private final Configuration conf;
+  private final boolean useUgi;
+  private final String serverPrincipal;
+  private final int connectionTimeout;
+  private final boolean isKerberosEnabled;
   private static final ImmutableMap<String, String> SASL_PROPERTIES =
     ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
 
   /**
+   * Initialize the object based on the sentry configuration provided.
+   *
+   * @param conf            Sentry configuration
+   * @param transportConfig transport configuration to use
+   */
+  public SentryTransportFactory(Configuration conf,
+                         SentryClientTransportConfigInterface transportConfig) {
+
+    this.conf = conf;
+    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
+    connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
+    isKerberosEnabled = transportConfig.isKerberosEnabled(conf);
+    if (isKerberosEnabled) {
+      useUgi = transportConfig.useUserGroupInformation(conf);
+      serverPrincipal = transportConfig.getSentryPrincipal(conf);
+    } else {
+      serverPrincipal = null;
+      useUgi = false;
+    }
+  }
+
+  /**
+   * Connect to the endpoint and return a connected Thrift transport.
+   * @return Connection to the endpoint
+   * @throws IOException
+   */
+  @Override
+  public TTransportWrapper getTransport(HostAndPort endpoint) throws IOException {
+    return new TTransportWrapper(connectToServer(new InetSocketAddress(endpoint.getHostText(),
+                                                 endpoint.getPort())),
+                                 endpoint);
+  }
+
+  /**
+   * Connect to the specified socket address and throw IOException if failed.
+   *
+   * @param serverAddress Address client needs to connect
+   * @throws Exception if there is failure in establishing the connection.
+   */
+  private TTransport connectToServer(InetSocketAddress serverAddress) throws IOException {
+    try {
+      TTransport thriftTransport = createTransport(serverAddress);
+      thriftTransport.open();
+      return thriftTransport;
+    } catch (TTransportException e) {
+      throw new IOException("Failed to open transport: " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Create transport given InetSocketAddress
+   * @param serverAddress - endpoint address
+   * @return unconnected transport
+   * @throws TTransportException
+   * @throws IOException
+   */
+  private TTransport createTransport(InetSocketAddress serverAddress)
+          throws IOException {
+    String hostName = serverAddress.getHostName();
+    int port = serverAddress.getPort();
+    TTransport socket = new TSocket(hostName, port, connectionTimeout);
+
+    if (!isKerberosEnabled) {
+      LOGGER.debug("created unprotected connection to {}:{} ", hostName, port);
+      return socket;
+    }
+
+    String principal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+    String[] serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
+    if (serverPrincipalParts.length != 3) {
+      throw new IOException("Kerberos principal should have 3 parts: " + principal);
+    }
+
+    UgiSaslClientTransport connection =
+            new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
+              serverPrincipalParts[0], serverPrincipalParts[1],
+              socket, useUgi);
+
+    LOGGER.debug("created secured connection to {}:{} ", hostName, port);
+    return connection;
+  }
+
+  /**
    * This transport wraps the Sasl transports to set up the right UGI context for open().
    */
-  public static class UgiSaslClientTransport extends TSaslClientTransport {
-    UserGroupInformation ugi = null;
+  private static class UgiSaslClientTransport extends TSaslClientTransport {
+    private UserGroupInformation ugi = null;
 
-    public UgiSaslClientTransport(String mechanism, String protocol,
-                                  String serverName, TTransport transport,
-                                  boolean wrapUgi, Configuration conf)
-      throws IOException, SaslException {
+    UgiSaslClientTransport(String mechanism, String protocol,
+                           String serverName, TTransport transport,
+                           boolean wrapUgi)
+            throws IOException, SaslException {
       super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
-        transport);
+              transport);
       if (wrapUgi) {
         ugi = UserGroupInformation.getLoginUser();
       }
@@ -98,8 +177,9 @@
         } catch (IOException e) {
           throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
         } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
           throw new TTransportException(
-            "Interrupted while opening underlying transport: " + e.getMessage(), e);
+                  "Interrupted while opening underlying transport: " + e.getMessage(), e);
         }
       }
     }
@@ -108,203 +188,4 @@
       super.open();
     }
   }
-
-  /**
-   * Initialize the object based on the sentry configuration provided.
-   * List of configured servers are reordered randomly preventing all
-   * clients connecting to the same server.
-   *
-   * @param conf            Sentry configuration
-   * @param transportConfig transport configuration to use
-   */
-  public SentryTransportFactory(Configuration conf,
-                                SentryClientTransportConfigInterface transportConfig) throws IOException {
-
-    this.conf = conf;
-    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
-    serverPrincipalParts = null;
-    this.transportConfig = transportConfig;
-
-    try {
-      this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
-      this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
-      if(transportConfig.isKerberosEnabled(conf) &&
-        transportConfig.useUserGroupInformation(conf)) {
-          // Re-initializing UserGroupInformation, if needed
-          UserGroupInformationInitializer.initialize(conf);
-      }
-      String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
-
-      int serverPort = transportConfig.getServerRpcPort(conf);
-
-      String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
-      HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
-
-      this.endpoints = new ArrayList<>(hostsAndPortsStrArr.length);
-      for (HostAndPort endpoint : hostsAndPorts) {
-        this.endpoints.add(
-          new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
-        LOGGER.debug("Added server endpoint: " + endpoint.toString());
-      }
-
-      if((endpoints.size() > 1) && (transportConfig.isLoadBalancingEnabled(conf))) {
-        // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
-        // and load balance the connections towards sentry servers
-        Collections.shuffle(endpoints);
-      }
-    } catch (MissingConfigurationException e) {
-      throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Initialize object based on the parameters provided provided.
-   *
-   * @param addr            Host address which the client needs to connect
-   * @param port            Host Port which the client needs to connect
-   * @param conf            Sentry configuration
-   * @param transportConfig transport configuration to use
-   */
-  public SentryTransportFactory(String addr, int port, Configuration conf,
-                                SentryClientTransportConfigInterface transportConfig) throws IOException {
-    // copy the configuration because we may make modifications to it.
-    this.conf = new Configuration(conf);
-    serverPrincipalParts = null;
-    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
-    this.transportConfig = transportConfig;
-
-    try {
-      this.endpoints = new ArrayList<>(1);
-      this.endpoints.add(NetUtils.createSocketAddr(addr, port));
-      this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
-      this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
-    } catch (MissingConfigurationException e) {
-      throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e);
-    }
-  }
-
-
-  /**
-   * On connection error, Iterates through all the configured servers and tries to connect.
-   * On successful connection, control returns
-   * On connection failure, continues iterating through all the configured sentry servers,
-   * and then retries the whole server list no more than connectionFullRetryTotal times.
-   * In this case, it won't introduce more latency when some server fails.
-   * <p>
-   * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
-   */
-  public TTransport getTransport() throws IOException {
-    IOException currentException = null;
-    for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
-      try {
-        return connectToAvailableServer();
-      } catch (IOException e) {
-        currentException = e;
-        LOGGER.error(
-                "Failed to connect to all the configured sentry servers, Retrying again");
-      }
-    }
-    // Throws exception on reaching the connectionFullRetryTotal.
-    LOGGER.error(
-      String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
-      currentException);
-    throw currentException;
-  }
-
-  /**
-   * Iterates through all the configured servers and tries to connect.
-   * On connection error, tries to connect to next server.
-   * Control returns on successful connection OR it's done trying to all the
-   * configured servers.
-   *
-   * @throws IOException
-   */
-  private TTransport connectToAvailableServer() throws IOException {
-    IOException currentException = null;
-    for (InetSocketAddress addr : endpoints) {
-      try {
-        return connectToServer(addr);
-      } catch (IOException e) {
-        LOGGER.error(String.format("Failed connection to %s: %s",
-          addr.toString(), e.getMessage()), e);
-        currentException = e;
-      }
-    }
-    throw currentException;
-  }
-
-  /**
-   * Connect to the specified socket address and throw IOException if failed.
-   *
-   * @param serverAddress Address client needs to connect
-   * @throws Exception if there is failure in establishing the connection.
-   */
-  private TTransport connectToServer(InetSocketAddress serverAddress) throws IOException {
-    try {
-      thriftTransport = createTransport(serverAddress);
-      thriftTransport.open();
-    } catch (TTransportException e) {
-      throw new IOException("Failed to open transport: " + e.getMessage(), e);
-    } catch (MissingConfigurationException e) {
-      throw new RuntimeException(e.getMessage(), e);
-    }
-
-    LOGGER.debug("Successfully opened transport: " + thriftTransport + " to " + serverAddress);
-    return thriftTransport;
-  }
-
-  /**
-   * New socket is is created
-   *
-   * @param serverAddress
-   * @return
-   * @throws TTransportException
-   * @throws MissingConfigurationException
-   * @throws IOException
-   */
-  private TTransport createTransport(InetSocketAddress serverAddress)
-    throws TTransportException, MissingConfigurationException, IOException {
-    TTransport socket = new TSocket(serverAddress.getHostName(),
-      serverAddress.getPort(), connectionTimeout);
-
-    if (!transportConfig.isKerberosEnabled(conf)) {
-      return socket;
-    } else {
-      String serverPrincipal = transportConfig.getSentryPrincipal(conf);
-      serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
-      LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
-      if (serverPrincipalParts == null) {
-        serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
-        Preconditions.checkArgument(serverPrincipalParts.length == 3,
-          "Kerberos principal should have 3 parts: " + serverPrincipal);
-      }
-
-      boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
-      return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
-        serverPrincipalParts[0], serverPrincipalParts[1],
-        socket, wrapUgi, conf);
-    }
-  }
-
-  private boolean isConnected() {
-    return thriftTransport != null && thriftTransport.isOpen();
-  }
-
-  /**
-   * Method currently closes the transport
-   * TODO (Kalyan) Plan is to hold the transport and resuse it accross multiple client's
-   * That way, new connection need not be created for each new client
-   */
-  public void releaseTransport() {
-    close();
-  }
-
-  /**
-   * Method closes the transport
-   */
-  public void close() {
-    if (isConnected()) {
-      thriftTransport.close();
-    }
-  }
 }
\ No newline at end of file
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java
new file mode 100644
index 0000000..e33dd2b
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.base.Preconditions;
+import com.google.common.net.HostAndPort;
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.utils.ThriftUtil;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Pool of transport connections to Sentry server.
+ * The pool caches open connections to multiple Sentry servers,
+ * specified in the configuration.
+ *
+ * When transport pooling is disabled in configuration,
+ * creates transports directly.
+ */
+@ThreadSafe
+public final class SentryTransportPool implements AutoCloseable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportPool.class);
+
+  // Used for logging to identify pool instances. This is only useful for test debugging
+  // so we do not preserve thread safety for this field.
+  private static int poolId = 0;
+  private final int id;
+
+  // True if using Object pool
+  private final boolean isPoolEnabled;
+
+  // Load balance between servers if true
+  private final boolean doLoadBalancing;
+
+  // List of all known servers
+  private final ArrayList<HostAndPort> endpoints;
+
+  // Transport pool which keeps connected transports
+  private final KeyedObjectPool<HostAndPort, TTransportWrapper> pool;
+  // Source of connected transports
+  private final TransportFactory transportFactory;
+
+  // Set when we are closed
+  private final AtomicBoolean closed = new AtomicBoolean();
+
+  /**
+   * Configure transport pool.
+   * <p>
+   * The pool accepts the following configuration:
+   * <ul>
+   *   <li>Maximum total number of objects in the pool</li>
+   *   <li>Minimum number of idle objects</li>
+   *   <li>Maximum number of idle objects</li>
+   *   <li>Minimum time before the object is evicted</li>
+   *   <li>Interval between evictions</li>
+   * </ul>
+   * @param conf Configuration
+   * @param transportConfig Configuration interface
+   * @param transportFactory Transport factory used to produce transports
+   */
+  public SentryTransportPool(Configuration conf,
+                             SentryClientTransportConfigInterface transportConfig,
+                             TransportFactory transportFactory) {
+
+    // This isn't thread-safe, but we don't care - it is only used
+    // for debugging when running tests - normal apps use a single pool
+    poolId++;
+    id = poolId;
+
+    this.transportFactory = transportFactory;
+    doLoadBalancing = transportConfig.isLoadBalancingEnabled(conf);
+    isPoolEnabled = transportConfig.isTransportPoolEnabled(conf);
+
+    // Get list of server addresses
+    String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
+    int serverPort = transportConfig.getServerRpcPort(conf);
+    LOGGER.info("Creating pool for {} with default port {}",
+            hostsAndPortsStr, serverPort);
+    String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+    Preconditions.checkArgument(hostsAndPortsStrArr.length > 0,
+            "At least one server should be specified");
+
+    endpoints = new ArrayList<>(hostsAndPortsStrArr.length);
+    for(String addr: hostsAndPortsStrArr) {
+      HostAndPort endpoint = ThriftUtil.parseAddress(addr, serverPort);
+      LOGGER.info("Adding endpoint {}", endpoint);
+      endpoints.add(endpoint);
+    }
+
+    // this.transportFactory = new SentryTransportFactory(conf, transportConfig);
+
+    if (!isPoolEnabled) {
+      pool = null;
+      LOGGER.info("Connection pooling is disabled");
+      return;
+    }
+
+    LOGGER.info("Connection pooling is enabled");
+    // Set pool configuration based on Configuration settings
+    GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
+
+    // Don't limit maximum number of objects in the pool
+    poolConfig.setMaxTotal(-1);
+
+    poolConfig.setMinIdlePerKey(transportConfig.getPoolMinIdle(conf));
+    poolConfig.setMaxIdlePerKey(transportConfig.getPoolMaxIdle(conf));
+
+    // Do not block when pool is exhausted, throw exception instead
+    poolConfig.setBlockWhenExhausted(false);
+    poolConfig.setTestOnReturn(true);
+
+    // No limit for total objects in the pool
+    poolConfig.setMaxTotalPerKey(transportConfig.getPoolMaxTotal(conf));
+    poolConfig.setMinEvictableIdleTimeMillis(transportConfig.getMinEvictableTimeSec(conf));
+    poolConfig.setTimeBetweenEvictionRunsMillis(transportConfig.getTimeBetweenEvictionRunsSec(conf));
+
+    // Create object pool
+    pool = new GenericKeyedObjectPool<>(new PoolFactory(this.transportFactory, id),
+            poolConfig);
+  }
+
+  public TTransportWrapper getTransport() throws Exception {
+    List<HostAndPort> servers;
+    // If we are doing load balancing and there is more then one server,
+    // shuffle them before obtaining connection
+    if (doLoadBalancing && (endpoints.size() > 1)) {
+      servers = new ArrayList<>(endpoints);
+      Collections.shuffle(servers);
+    } else {
+      servers = endpoints;
+    }
+
+    // Try to get a connection from one of the pools
+    Exception failure = null;
+    for(HostAndPort addr: servers) {
+      try {
+        TTransportWrapper transport =
+          isPoolEnabled ?
+                pool.borrowObject(addr) :
+                transportFactory.getTransport(addr);
+        LOGGER.debug("[{}] obtained transport {}", id, transport);
+        return transport;
+      } catch (IllegalStateException e) {
+        // Should not happen
+        LOGGER.error("Unexpected error from pool {}", id,  e);
+        failure = e;
+      } catch (Exception e) {
+        LOGGER.error("Failed to obtain transport for {}: {}",
+                addr, e.getMessage());
+        failure = e;
+      }
+    }
+    // Failed to borrow connect to any endpoint
+    assert failure != null;
+    throw failure;
+  }
+
+  /**
+   * Return transport to the pool
+   * @param transport Open transport
+   */
+  public void returnTransport(TTransportWrapper transport) {
+    if (closed.get()) {
+      LOGGER.debug("Returned {} to closed pool", transport);
+      transport.close();
+      return;
+    }
+    try {
+      if (isPoolEnabled) {
+        LOGGER.debug("[{}] returning {}", id, transport);
+        pool.returnObject(transport.getAddress(), transport);
+      } else {
+        LOGGER.debug("Closing {}", transport);
+        transport.close();
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to return {}", transport, e);
+    }
+  }
+
+  public void invalidateTransport(TTransportWrapper transport) {
+    if (closed.get()) {
+      LOGGER.debug("invalidated {} for closed pool", transport);
+      transport.close();
+      return;
+    }
+    try {
+      LOGGER.debug("[{}] Invalidating address {}", id, transport);
+      if (!isPoolEnabled) {
+        transport.close();
+      } else {
+        pool.invalidateObject(transport.getAddress(), transport);
+        // Invalidate the whole pool associated with the given address
+        // It is a bit brutal since a single bad connection may
+        // cause an invalidation, but otherwise we may have a lot of bad
+        // connections in the pool and try to return them.
+        pool.clear(transport.getAddress());
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to invalidate {}", transport, e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (closed.get()) {
+      // already closed
+      return;
+    }
+    LOGGER.debug("[{}] closing", id);
+    if (pool != null) {
+      LOGGER.debug("Closing pool of {}/{} endpoints",
+              pool.getNumIdle(), pool.getNumActive());
+      pool.close();
+    }
+  }
+
+  /**
+   * Factory that creates and destroys pool objects
+   */
+  private static final class PoolFactory
+          extends BaseKeyedPooledObjectFactory<HostAndPort, TTransportWrapper> {
+    private final TransportFactory transportFactory;
+    private final int id;
+
+    /**
+     * Create a pool factory associated with the given transport factory
+     * @param transportFactory - factory producing transports
+     * @param id pool id (for debugging)
+     */
+    private PoolFactory(TransportFactory transportFactory, int id) {
+      this.transportFactory = transportFactory;
+      this.id = id;
+    }
+
+    @Override
+    public boolean validateObject(HostAndPort key, PooledObject<TTransportWrapper> p) {
+      TTransportWrapper transport = p.getObject();
+      if (transport == null) {
+        LOGGER.error("No transport to validate");
+        return false;
+      }
+      if (transport.getAddress() != key) {
+        LOGGER.error("Invalid endpoint {}: does not match {}", transport, key);
+        return false;
+      }
+      try {
+        transport.flush();
+      } catch (TTransportException e) {
+        LOGGER.error("Failed to verify connection to {}", key, e);
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public TTransportWrapper create(HostAndPort key) throws Exception {
+      TTransportWrapper transportWrapper = transportFactory.getTransport(key);
+      LOGGER.debug("[{}] created {}", id, transportWrapper);
+      return transportWrapper;
+    }
+
+    @Override
+    public void destroyObject(HostAndPort key, PooledObject<TTransportWrapper> p) throws Exception {
+      TTransportWrapper transport = p.getObject();
+      if (transport != null) {
+        LOGGER.debug("[{}] Destroying endpoint {}", id, transport);
+        try {
+          transport.close();
+        } catch (RuntimeException e) {
+          LOGGER.error("fail to destroy endpoint {}", transport, e);
+        }
+      }
+    }
+
+    @Override
+    public PooledObject<TTransportWrapper> wrap(TTransportWrapper value) {
+      return new DefaultPooledObject<>(value);
+    }
+  }
+
+}
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TTransportWrapper.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TTransportWrapper.java
new file mode 100644
index 0000000..07283fb
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TTransportWrapper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.net.HostAndPort;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Closeable;
+
+/**
+ * Extension of Thrift Transport which also provides the endpoint address.
+ * The address is represented as {@link HostAndPort} object.
+ */
+public final class TTransportWrapper implements Closeable {
+  private final TTransport transport;
+  private final HostAndPort address;
+
+  /**
+   * @param transport Thrift transport (may be in any state)
+   * @param address The address associated with this transport.
+   */
+  TTransportWrapper(TTransport transport, HostAndPort address) {
+    this.transport = transport;
+    this.address = address;
+  }
+
+  /**
+   * @return Thrift transport value
+   */
+  public TTransport getTTransport() {
+    return transport;
+  }
+
+  /**
+   * @return endpoint address for the transport
+   */
+  public HostAndPort getAddress() {
+    return address;
+  }
+
+  /**
+   * @return True if and only if the transport is open
+   */
+  public boolean isOpen() {
+    return transport.isOpen();
+  }
+
+  /**
+   * Flush the underlying transport
+   * @throws TTransportException
+   */
+  public void flush() throws TTransportException {
+    transport.flush();
+  }
+
+  /**
+   * @return human-readable representation of a transport.
+   * It includes the endpoint address and open/closed state.
+   */
+  @Override
+  public String toString() {
+    return address.toString();
+  }
+
+  /**
+   * Close the underlying transport
+   */
+  @Override
+  public void close() {
+    transport.close();
+  }
+}
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TransportFactory.java
new file mode 100644
index 0000000..e115cbb
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TransportFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.net.HostAndPort;
+
+import java.io.IOException;
+
+/**
+ * Generic transport factory interface.
+ * <p>
+ * The intention is to implement transport pool in more abstract terms
+ * and be able to test it without actually connecting to any servers by
+ * implementing mock transport factories.
+ */
+public interface TransportFactory {
+  /**
+   * Connect to the endpoint and return a connected Thrift transport.
+   * @return Connection to the endpoint
+   * @throws IOException
+   */
+  TTransportWrapper getTransport(HostAndPort endpoint) throws IOException;
+}
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
index d9fab86..0fffb51 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
@@ -116,12 +116,8 @@
    * (host:port). The hostname could be in ipv6 style. If port is not specified,
    * defaultPort will be used.
    */
-  public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort) {
-    HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length];
-    for (int i = 0; i < hostsAndPorts.length; i++) {
-     hostsAndPorts[i] =
-          HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort);
-    }
-    return hostsAndPorts;
+  public static HostAndPort parseAddress(String address, int defaultPort) {
+    return HostAndPort.fromString(address).withDefaultPort(defaultPort);
   }
+
 }
\ No newline at end of file
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
index 34caa0e..7304fd8 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
@@ -21,7 +21,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SentryUpdater {
+class SentryUpdater {
 
   private SentryHDFSServiceClient sentryClient;
   private final Configuration conf;
@@ -29,12 +29,12 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(SentryUpdater.class);
 
-  public SentryUpdater(Configuration conf, SentryAuthorizationInfo authzInfo) throws Exception {
+  SentryUpdater(Configuration conf, SentryAuthorizationInfo authzInfo) throws Exception {
     this.conf = conf;
     this.authzInfo = authzInfo;
   }
 
-  public SentryAuthzUpdate getUpdates() {
+  SentryAuthzUpdate getUpdates() {
     if (sentryClient == null) {
       try {
         sentryClient = SentryHDFSServiceClientFactory.create(conf);
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
index 11f6894..49d2360 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
@@ -17,19 +17,9 @@
  */
 package org.apache.sentry.hdfs;
 
-import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-
-public interface SentryHDFSServiceClient {
+public interface SentryHDFSServiceClient extends AutoCloseable {
   String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";
 
-  void notifyHMSUpdate(PathsUpdate update)
-      throws SentryHdfsServiceException;
-
-  long getLastSeenHMSPathSeqNum() throws SentryHdfsServiceException;
-
-  SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
-      throws SentryHdfsServiceException;
-
-  void close();
+  SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum);
 }
 
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
index 798bbef..1cdbb85 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
@@ -17,15 +17,12 @@
  */
 package org.apache.sentry.hdfs;
 
-import java.io.IOException;
-import java.util.LinkedList;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-import org.apache.sentry.core.common.transport.SentryTransportFactory;
-import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
+import org.apache.sentry.core.common.transport.SentryConnection;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.apache.sentry.core.common.transport.TTransportWrapper;
+import org.apache.sentry.hdfs.ServiceConstants.ClientConfig;
 import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
 import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
 import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
@@ -35,82 +32,57 @@
 import org.apache.thrift.protocol.TMultiplexedProtocol;
 import org.apache.thrift.protocol.TProtocol;
 
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.LinkedList;
 
 /**
  * Sentry HDFS Service Client
  * <p>
- * The public implementation of SentryHDFSServiceClient.
- * A Sentry Client in which all the operations are synchronized for thread safety
- * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state.
- * So it is important to close and re-open the transport so that new socket is used.
+ * The class isn't thread-safe - it is up to the aller to ensure thread safety
  */
-
-public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClient, SentryServiceClient {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class);
+public class SentryHDFSServiceClientDefaultImpl
+        implements SentryHDFSServiceClient, SentryConnection {
+  private final boolean useCompactTransport;
   private Client client;
-  private SentryTransportFactory transportFactory;
-  private TTransport transport;
-  private Configuration conf;
+  private final SentryTransportPool transportPool;
+  private TTransportWrapper transport;
+  private final long maxMessageSize;
 
-  public SentryHDFSServiceClientDefaultImpl(Configuration conf, SentryHDFSClientTransportConfig transportConfig) throws IOException {
-    transportFactory = new SentryTransportFactory(conf, transportConfig);
-    this.conf = conf;
+  SentryHDFSServiceClientDefaultImpl(Configuration conf,
+                                     SentryTransportPool transportPool) throws IOException {
+    maxMessageSize = conf.getLong(ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
+            ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+    useCompactTransport = conf.getBoolean(ClientConfig.USE_COMPACT_TRANSPORT,
+            ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT);
+    this.transportPool = transportPool;
   }
 
   /**
    * Connect to the sentry server
    *
-   * @throws IOException
+   * @throws Exception
    */
   @Override
-  public synchronized void connect() throws IOException {
-    if (transport != null && transport.isOpen()) {
+  public void connect() throws Exception {
+    if ((transport != null) && transport.isOpen()) {
       return;
     }
 
-    transport = transportFactory.getTransport();
-    TProtocol tProtocol = null;
-    long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
-            ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
-    if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT,
-            ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
-      tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize);
+    transport = transportPool.getTransport();
+    TProtocol tProtocol;
+    if (useCompactTransport) {
+      tProtocol = new TCompactProtocol(transport.getTTransport(), maxMessageSize, maxMessageSize);
     } else {
-      tProtocol = new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true);
+      tProtocol = new TBinaryProtocol(transport.getTTransport(), maxMessageSize, maxMessageSize, true, true);
     }
     TMultiplexedProtocol protocol = new TMultiplexedProtocol(
             tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME);
 
-    client = new SentryHDFSService.Client(protocol);
-    LOGGER.info("Successfully created client");
+    client = new Client(protocol);
   }
 
   @Override
-  public synchronized void notifyHMSUpdate(PathsUpdate update)
-          throws SentryHdfsServiceException {
-    try {
-      client.handle_hms_notification(update.toThrift());
-    } catch (Exception e) {
-      throw new SentryHdfsServiceException("Thrift Exception occurred !!", e);
-    }
-  }
-
-  @Override
-  public synchronized long getLastSeenHMSPathSeqNum()
-          throws SentryHdfsServiceException {
-    try {
-      return client.check_hms_seq_num(-1);
-    } catch (Exception e) {
-      throw new SentryHdfsServiceException("Thrift Exception occurred !!", e);
-    }
-  }
-
-  @Override
-  public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
+  public SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
           throws SentryHdfsServiceException {
     SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>());
     try {
@@ -132,12 +104,23 @@
   }
 
   @Override
-  public synchronized void close() {
-    transportFactory.close();
+  public void close() {
+    done();
   }
 
   @Override
-  public void disconnect() {
-    transportFactory.releaseTransport();
+  public void done() {
+    if (transport != null) {
+      transportPool.returnTransport(transport);
+      transport = null;
+    }
+  }
+
+  @Override
+  public void invalidate() {
+    if (transport != null) {
+      transportPool.invalidateTransport(transport);
+      transport = null;
+    }
   }
 }
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
index e350103..fb34b0b 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
@@ -18,28 +18,107 @@
 package org.apache.sentry.hdfs;
 
 import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
 import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.sentry.core.common.utils.SentryConstants.KERBEROS_MODE;
 
 /**
- * Client factory to create normal client or proxy with HA invocation handler
+ * Client factory for creating HDFS service clients.
+ * This is a singleton which uses a single factory.
  */
+@ThreadSafe
 public class SentryHDFSServiceClientFactory {
-  private final static SentryHDFSClientTransportConfig transportConfig =
-          new SentryHDFSClientTransportConfig();
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientFactory.class);
 
-  private SentryHDFSServiceClientFactory() {
-    // Make constructor private to avoid instantiation
+  private static final AtomicReference<SentryHDFSServiceClientFactory> clientFactory =
+          new AtomicReference<>();
+
+  private final SentryHDFSClientTransportConfig transportConfig =
+          new SentryHDFSClientTransportConfig();
+  private final Configuration conf;
+  private final SentryTransportPool transportPool;
+
+  /**
+   * Return a client instance
+   * @param conf
+   * @return
+   * @throws Exception
+   */
+  public static SentryHDFSServiceClient create(Configuration conf) throws Exception {
+    SentryHDFSServiceClientFactory factory = clientFactory.get();
+    if (factory != null) {
+      return factory.create();
+    }
+    factory = new SentryHDFSServiceClientFactory(conf);
+    boolean ok = clientFactory.compareAndSet(null, factory);
+    if (ok) {
+      return factory.create();
+    }
+    factory.close();
+    return clientFactory.get().create();
   }
 
-  public static SentryHDFSServiceClient create(Configuration conf)
-    throws Exception {
+  private SentryHDFSServiceClientFactory(Configuration conf) {
+    Configuration clientConf = conf;
+
+    // When kerberos is enabled,  UserGroupInformation should have been initialized with
+    // HADOOP_SECURITY_AUTHENTICATION property. There are instances where this is not done.
+    // Instead of depending on the callers to update this configuration and to be
+    // sure that UserGroupInformation is properly initialized, sentry client is explicitly
+    // doing it.
+    //
+    // This whole piece of code is a bit ugly but we want to avoid doing this in the transport
+    // code during connection establishment, so we are doing it upfront here instead.
+    boolean useKerberos = transportConfig.isKerberosEnabled(conf);
+
+    if (useKerberos) {
+      LOGGER.info("Using Kerberos authentication");
+      String authMode = conf.get(HADOOP_SECURITY_AUTHENTICATION, "");
+      if (authMode != KERBEROS_MODE) {
+        // Force auth mode to be Kerberos
+        clientConf = new Configuration(conf);
+        clientConf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS_MODE);
+      }
+    }
+
+    this.conf = clientConf;
+
+    boolean useUGI = transportConfig.useUserGroupInformation(conf);
+
+    if (useUGI) {
+      LOGGER.info("Using UserGroupInformation authentication");
+      UserGroupInformation.setConfiguration(this.conf);
+    }
+
+    transportPool = new SentryTransportPool(conf, transportConfig,
+            new SentryTransportFactory(conf, transportConfig));
+  }
+
+  private SentryHDFSServiceClient create() throws Exception {
     return (SentryHDFSServiceClient) Proxy
       .newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(),
         SentryHDFSServiceClientDefaultImpl.class.getInterfaces(),
         new RetryClientInvocationHandler(conf,
-          new SentryHDFSServiceClientDefaultImpl(conf, transportConfig), transportConfig));
+          new SentryHDFSServiceClientDefaultImpl(conf, transportPool), transportConfig));
+  }
+
+  void close() {
+    try {
+      transportPool.close();
+    } catch (Exception e) {
+      LOGGER.error("failed to close transport pool", e);
+    }
   }
 }
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
index eccf83b..3ee3724 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
@@ -44,7 +44,10 @@
   @After
   public void after() {
     if (hdfsClient != null) {
-      hdfsClient.close();
+      try {
+        hdfsClient.close();
+      } catch (Exception ignored) {
+      }
     }
   }
 
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
index 75d2993..480991d 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
@@ -77,9 +77,8 @@
     int retries = Math.max(retryCount + 1, 1); // if customer configs retryCount as Integer.MAX_VALUE, try only once
     while (retries > 0) {
       retries--;
-      SentryPolicyServiceClient policyServiceClient = null;
-      try {
-        policyServiceClient = SentryServiceClientFactory.create(conf);
+      try (SentryPolicyServiceClient policyServiceClient =
+                   SentryServiceClientFactory.create(conf)) {
         return ImmutableSet.copyOf(policyServiceClient.listPrivilegesForProvider(groups, users,
             roleSet, authorizableHierarchy));
       } catch (Exception e) {
@@ -97,10 +96,6 @@
             LOGGER.info("Sleeping is interrupted.", e1);
           }
         }
-      } finally {
-        if(policyServiceClient != null) {
-          policyServiceClient.close();
-        }
       }
     }
 
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
index 134012d..6c7d3ef 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
@@ -82,6 +82,7 @@
       } catch (NoSuchMethodException | ClassNotFoundException | InstantiationException | InvocationTargetException | IllegalAccessException e) {
         throw new RuntimeException("Failed to create privilege converter of type " + privilegeConverter, e);
       }
+      LOGGER.debug("Starting Updateable Cache");
       UpdatableCache cache = new UpdatableCache(conf, getComponentType(), getServiceName(), sentryPrivilegeConverter);
       try {
         cache.startUpdateThread(true);
@@ -110,9 +111,7 @@
     if (enableCaching) {
       return super.getPrivileges(groups, roleSet, authorizableHierarchy);
     } else {
-      SentryGenericServiceClient client = null;
-      try {
-        client = getClient();
+      try (SentryGenericServiceClient client = getClient()){
         return ImmutableSet.copyOf(client.listPrivilegesForProvider(componentType, serviceName,
             roleSet, groups, Arrays.asList(authorizableHierarchy)));
       } catch (SentryUserException e) {
@@ -121,10 +120,6 @@
       } catch (Exception e) {
         String msg = "Unable to obtain client:" + e.getMessage();
         LOGGER.error(msg, e);
-      } finally {
-        if (client != null) {
-          client.close();
-        }
       }
     }
     return ImmutableSet.of();
@@ -138,10 +133,8 @@
     if (enableCaching) {
       return super.getRoles(groups, roleSet);
     } else {
-      SentryGenericServiceClient client = null;
-      try {
+      try (SentryGenericServiceClient client = getClient()){
         Set<TSentryRole> tRoles = Sets.newHashSet();
-        client = getClient();
         //get the roles according to group
         String requestor = UserGroupInformation.getCurrentUser().getShortUserName();
         for (String group : groups) {
@@ -158,10 +151,6 @@
       } catch (Exception e) {
         String msg = "Unable to obtain client:" + e.getMessage();
         LOGGER.error(msg, e);
-      } finally {
-        if (client != null) {
-          client.close();
-        }
       }
       return ImmutableSet.of();
     }
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java
index 41708c3..d20710f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java
@@ -29,9 +29,15 @@
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 
-class UpdatableCache implements TableCache {
+public final class UpdatableCache implements TableCache, AutoCloseable {
   private static final Logger LOGGER = LoggerFactory.getLogger(UpdatableCache.class);
 
+  // Timer for getting updates periodically
+  private final Timer timer = new Timer();
+  private boolean initialized = false;
+  // saved timer is used by tests to cancel previous timer
+  private static Timer savedTimer;
+
   private final String componentType;
   private final String serviceName;
   private final long cacheTtlNs;
@@ -94,14 +100,13 @@
     String requestor;
     requestor = UserGroupInformation.getLoginUser().getShortUserName();
 
-    SentryGenericServiceClient client = null;
-    try {
-      client = getClient();
-      final Set<TSentryRole> tSentryRoles = client.listAllRoles(requestor, componentType);
+    try(SentryGenericServiceClient client = getClient()) {
+      Set<TSentryRole>  tSentryRoles = client.listAllRoles(requestor, componentType);
 
       for (TSentryRole tSentryRole : tSentryRoles) {
         final String roleName = tSentryRole.getRoleName();
-        final Set<TSentryPrivilege> tSentryPrivileges = client.listPrivilegesByRoleName(requestor, roleName, componentType, serviceName);
+        final Set<TSentryPrivilege> tSentryPrivileges =
+                client.listPrivilegesByRoleName(requestor, roleName, componentType, serviceName);
         for (String group : tSentryRole.getGroups()) {
           Set<String> currentPrivileges = tempCache.get(group, roleName);
           if (currentPrivileges == null) {
@@ -113,12 +118,8 @@
           }
         }
       }
-    } finally {
-      if (client != null) {
-        client.close();
-      }
+      return tempCache;
     }
-    return tempCache;
   }
 
   /**
@@ -136,7 +137,19 @@
       reloadData();
     }
 
-    Timer timer = new Timer();
+    if (initialized) {
+      LOGGER.info("Already initialized");
+      return;
+    }
+
+    initialized = true;
+    // Save timer to be able to cancel it.
+    if (savedTimer != null) {
+      LOGGER.debug("Resetting saved timer");
+      savedTimer.cancel();
+    }
+    savedTimer = timer;
+
     final long refreshIntervalMs = TimeUnit.NANOSECONDS.toMillis(cacheTtlNs);
     timer.scheduleAtFixedRate(
         new TimerTask() {
@@ -158,6 +171,7 @@
 
   private void revokeAllPrivilegesIfRequired() {
     if (++consecutiveUpdateFailuresCount > allowedUpdateFailuresCount) {
+      consecutiveUpdateFailuresCount = 0;
       // Clear cache to revoke all privileges.
       // Update table cache to point to an empty table to avoid thread-unsafe characteristics of HashBasedTable.
       this.table = HashBasedTable.create();
@@ -175,4 +189,21 @@
     final long currentTimeNs = System.nanoTime();
     return lastRefreshedNs + cacheTtlNs < currentTimeNs;
   }
+
+  /**
+   * Only called by tests to disable timer.
+   */
+  public static void disable() {
+    if (savedTimer != null) {
+      LOGGER.info("Disabling timer");
+      savedTimer.cancel();
+    }
+  }
+
+  @Override
+  public void close() {
+    timer.cancel();
+    savedTimer = null;
+    LOGGER.info("Closed Updatable Cache");
+  }
 }
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
index 11cdee7..246d0b4 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
@@ -25,7 +25,7 @@
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
 
-public interface SentryGenericServiceClient {
+public interface SentryGenericServiceClient extends AutoCloseable {
 
   /**
    * Create a sentry role
@@ -191,6 +191,4 @@
   Map<String, TSentryPrivilegeMap> listPrivilegsbyAuthorizable(String component,
       String serviceName, String requestorUserName, Set<String> authorizablesSet,
       Set<String> groups, ActiveRoleSet roleSet) throws SentryUserException;
-
-  void close();
 }
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
index e23d13b..6301a6b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
@@ -17,52 +17,63 @@
  */
 package org.apache.sentry.provider.db.generic.service.thrift;
 
-import java.io.IOException;
-import java.util.*;
-
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
-
-import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.core.common.transport.SentryConnection;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.apache.sentry.core.common.transport.TTransportWrapper;
 import org.apache.sentry.core.model.db.AccessConstants;
-import org.apache.sentry.service.thrift.ServiceConstants;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService.Client;
+import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
 import org.apache.sentry.service.thrift.Status;
 import org.apache.sentry.service.thrift.sentry_common_serviceConstants;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TMultiplexedProtocol;
 
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-import com.google.common.collect.Lists;
 
 /**
- * Sentry Generic Service Client
+ * Sentry Generic Service Client.
  * <p>
- * The public implementation of SentryGenericServiceClient.
- * TODO(kalyan) A Sentry Client in which all the operations are synchronized for thread safety
- * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state.
- * So it is important to close and re-open the transportFactory so that new socket is used.
+ * Thread safety. This class is not thread safe - it is up to the
+ * caller to ensure thread safety.
  */
+public class SentryGenericServiceClientDefaultImpl
+        implements SentryGenericServiceClient, SentryConnection {
 
-public class SentryGenericServiceClientDefaultImpl implements SentryGenericServiceClient, SentryServiceClient {
-  private SentryGenericPolicyService.Client client;
-  private SentryTransportFactory transportFactory;
-  private TTransport transport;
-  private Configuration conf;
-  private static final Logger LOGGER = LoggerFactory
-    .getLogger(SentryGenericServiceClientDefaultImpl.class);
+  private Client client;
+  private final SentryTransportPool transportPool;
+  private TTransportWrapper transport;
   private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured ";
+  private final long maxMessageSize;
 
-  public SentryGenericServiceClientDefaultImpl(Configuration conf, SentryPolicyClientTransportConfig transportConfig) throws IOException {
-    transportFactory = new SentryTransportFactory(conf, transportConfig);
-    this.conf = conf;
+  /**
+   * Initialize client with the given configuration, using specified transport pool
+   * implementation for obtaining transports.
+   * @param conf Sentry Configuration
+   * @param transportPool source of connected transports
+   */
+  SentryGenericServiceClientDefaultImpl(Configuration conf,
+                                        SentryTransportPool transportPool) {
+
+    //TODO(kalyan) need to find appropriate place to add it
+    // if (kerberos) {
+    //  // since the client uses hadoop-auth, we need to set kerberos in
+    //  // hadoop-auth if we plan to use kerberos
+    //  conf.set(HADOOP_SECURITY_AUTHENTICATION, SentryConstants.KERBEROS_MoODE);
+    // }
+    maxMessageSize = conf.getLong(ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
+            ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+    this.transportPool = transportPool;
   }
 
   /**
@@ -71,20 +82,18 @@
    * @throws IOException
    */
   @Override
-  public synchronized void connect() throws IOException {
-    if (transport != null && transport.isOpen()) {
+  public void connect() throws Exception {
+    if ((transport != null) && transport.isOpen()) {
       return;
     }
 
-    transport = transportFactory.getTransport();
-    TMultiplexedProtocol protocol = null;
-    long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
-      ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
-    protocol = new TMultiplexedProtocol(
-      new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true),
+    // Obtain connection to Sentry server
+    transport = transportPool.getTransport();
+    TMultiplexedProtocol protocol = new TMultiplexedProtocol(
+      new TBinaryProtocol(transport.getTTransport(), maxMessageSize,
+              maxMessageSize, true, true),
       SentryGenericPolicyProcessor.SENTRY_GENERIC_SERVICE_NAME);
-    client = new SentryGenericPolicyService.Client(protocol);
-    LOGGER.debug("Successfully created client");
+    client = new Client(protocol);
   }
 
   /**
@@ -96,7 +105,7 @@
    * @throws SentryUserException
    */
   @Override
-  public synchronized void createRole(String requestorUserName, String roleName, String component)
+  public void createRole(String requestorUserName, String roleName, String component)
     throws SentryUserException {
     TCreateSentryRoleRequest request = new TCreateSentryRoleRequest();
     request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
@@ -359,7 +368,7 @@
    * @throws SentryUserException
    */
   @Override
-  public synchronized Set<TSentryRole> listRolesByGroupName(
+  public Set<TSentryRole> listRolesByGroupName(
     String requestorUserName,
     String groupName,
     String component)
@@ -528,12 +537,23 @@
   }
 
   @Override
-  public synchronized void close() {
-    transportFactory.close();
+  public void close() {
+    done();
   }
 
   @Override
-  public void disconnect() {
-    transportFactory.releaseTransport();
+  public void done() {
+    if (transport != null) {
+      transportPool.returnTransport(transport);
+      transport = null;
+    }
+  }
+
+  @Override
+  public void invalidate() {
+    if (transport != null) {
+      transportPool.invalidateTransport(transport);
+      transport = null;
+    }
   }
 }
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
index 46ac4a3..1b47236 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
@@ -18,27 +18,131 @@
 package org.apache.sentry.provider.db.generic.service.thrift;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
 import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.ThreadSafe;
 import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.sentry.core.common.utils.SentryConstants.KERBEROS_MODE;
 
 /**
- * SentryGenericServiceClientFactory is a public class for the components which using Generic Model to create sentry client.
+ * Produces client connection for Sentry clients using Generic model.
+ * Factory is [alost] a singleton. Tests can call {@link #factoryReset()} to destroy the
+ * existing factory and create a new one. This may be needed because tests modify
+ * configuration and start and stop servers.
  */
+@ThreadSafe
 public final class SentryGenericServiceClientFactory {
-  private static final SentryPolicyClientTransportConfig transportConfig =
-          new SentryPolicyClientTransportConfig();
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryGenericServiceClientFactory.class);
 
-  private SentryGenericServiceClientFactory() {
+  // Used to implement a singleton
+  private static final AtomicReference<SentryGenericServiceClientFactory> clientFactory =
+          new AtomicReference<>();
+
+  private final SentryPolicyClientTransportConfig transportConfig =
+          new SentryPolicyClientTransportConfig();
+  private final SentryTransportPool transportPool;
+  private final Configuration conf;
+
+  /**
+   * Obtain an Generic policy client instance.
+   * @param conf Configuration that should be used. Configuration is only used for the
+   *             initial creation and ignored afterwords.
+   */
+  public static SentryGenericServiceClient create(Configuration conf) throws Exception {
+    SentryGenericServiceClientFactory factory = clientFactory.get();
+    if (factory != null) {
+      return factory.create();
+    }
+    factory = new SentryGenericServiceClientFactory(conf);
+    boolean ok = clientFactory.compareAndSet(null, factory);
+    if (ok) {
+      return factory.create();
+    }
+    factory.close();
+    return clientFactory.get().create();
   }
 
-  public static SentryGenericServiceClient create(Configuration conf) throws Exception {
+  /**
+   * Create a new factory instance and atach it to a connection pool instance.
+   * @param conf Configuration
+   */
+  private SentryGenericServiceClientFactory(Configuration conf) {
+    Configuration clientConf = conf;
+
+    // When kerberos is enabled,  UserGroupInformation should have been initialized with
+    // HADOOP_SECURITY_AUTHENTICATION property. There are instances where this is not done.
+    // Instead of depending on the callers to update this configuration and to be
+    // sure that UserGroupInformation is properly initialized, sentry client is explicitly
+    // doing it.
+    //
+    // This whole piece of code is a bit ugly but we want to avoid doing this in the transport
+    // code during connection establishment, so we are doing it upfront here instead.
+    boolean useKerberos = transportConfig.isKerberosEnabled(conf);
+
+    if (useKerberos) {
+      LOGGER.info("Using Kerberos authentication");
+      String authMode = conf.get(HADOOP_SECURITY_AUTHENTICATION, "");
+      if (authMode != KERBEROS_MODE) {
+        // Force auth mode to be Kerberos
+        clientConf = new Configuration(conf);
+        clientConf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS_MODE);
+      }
+    }
+
+    this.conf = clientConf;
+
+    boolean useUGI = transportConfig.useUserGroupInformation(conf);
+
+    if (useUGI) {
+      LOGGER.info("Using UserGroupInformation authentication");
+      UserGroupInformation.setConfiguration(this.conf);
+    }
+
+    transportPool = new SentryTransportPool(conf, transportConfig,
+            new SentryTransportFactory(conf, transportConfig));
+  }
+
+  /**
+   * Create a new client connection to the server for Generic model clients
+   * @return client instance
+   * @throws Exception if something goes wrong
+   */
+  private SentryGenericServiceClient create() throws Exception {
     return (SentryGenericServiceClient) Proxy
       .newProxyInstance(SentryGenericServiceClientDefaultImpl.class.getClassLoader(),
         SentryGenericServiceClientDefaultImpl.class.getInterfaces(),
         new RetryClientInvocationHandler(conf,
-          new SentryGenericServiceClientDefaultImpl(conf, transportConfig), transportConfig));
+          new SentryGenericServiceClientDefaultImpl(conf, transportPool), transportConfig));
   }
 
+  // Should only be used by tests.
+  // Resets the factory and destroys any pooled connections
+  public static void factoryReset() {
+    LOGGER.debug("factory reset");
+    SentryGenericServiceClientFactory factory = clientFactory.getAndSet(null);
+    if (factory != null) {
+      try {
+        factory.transportPool.close();
+      } catch (Exception e) {
+        LOGGER.error("failed to close transport pool", e);
+      }
+    }
+  }
+
+  void close() {
+    try {
+      transportPool.close();
+    } catch (Exception e) {
+      LOGGER.error("failed to close transport pool", e);
+    }
+  }
 }
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java
index 404adb8..b958b09 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java
@@ -62,12 +62,14 @@
     String service = conf.get(SOLR_SERVICE_NAME, "service1");
     // instantiate a solr client for sentry service.  This sets the ugi, so must
     // be done before getting the ugi below.
-    SentryGenericServiceClient client = SentryGenericServiceClientFactory.create(conf);
-    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-    String requestorName = ugi.getShortUserName();
+    try(SentryGenericServiceClient client =
+                SentryGenericServiceClientFactory.create(conf)) {
+      UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+      String requestorName = ugi.getShortUserName();
 
-    convertINIToSentryServiceCmds(component, service, requestorName, conf, client,
-        getPolicyFile(), getValidate(), getImportPolicy(), getCheckCompat());
+      convertINIToSentryServiceCmds(component, service, requestorName, conf, client,
+              getPolicyFile(), getValidate(), getImportPolicy(), getCheckCompat());
+    }
   }
 
   private Configuration getSentryConf() {
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java
index d6d9014..f6e5d1b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java
@@ -48,39 +48,41 @@
     Configuration conf = getSentryConf();
 
     String service = conf.get(KAFKA_SERVICE_NAME, "kafka1");
-    SentryGenericServiceClient client = SentryGenericServiceClientFactory.create(conf);
-    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-    String requestorName = ugi.getShortUserName();
+    try(SentryGenericServiceClient client =
+                SentryGenericServiceClientFactory.create(conf)) {
+      UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+      String requestorName = ugi.getShortUserName();
 
-    if (isCreateRole) {
-      command = new CreateRoleCmd(roleName, component);
-    } else if (isDropRole) {
-      command = new DropRoleCmd(roleName, component);
-    } else if (isAddRoleGroup) {
-      command = new AddRoleToGroupCmd(roleName, groupName, component);
-    } else if (isDeleteRoleGroup) {
-      command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
-    } else if (isGrantPrivilegeRole) {
-      command = new GrantPrivilegeToRoleCmd(roleName, component,
-          privilegeStr, new KafkaTSentryPrivilegeConverter(component, service));
-    } else if (isRevokePrivilegeRole) {
-      command = new RevokePrivilegeFromRoleCmd(roleName, component,
-          privilegeStr, new KafkaTSentryPrivilegeConverter(component, service));
-    } else if (isListRole) {
-      command = new ListRolesCmd(groupName, component);
-    } else if (isListPrivilege) {
-      command = new ListPrivilegesByRoleCmd(roleName, component,
-          service, new KafkaTSentryPrivilegeConverter(component, service));
-    }
+      if (isCreateRole) {
+        command = new CreateRoleCmd(roleName, component);
+      } else if (isDropRole) {
+        command = new DropRoleCmd(roleName, component);
+      } else if (isAddRoleGroup) {
+        command = new AddRoleToGroupCmd(roleName, groupName, component);
+      } else if (isDeleteRoleGroup) {
+        command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
+      } else if (isGrantPrivilegeRole) {
+        command = new GrantPrivilegeToRoleCmd(roleName, component,
+                privilegeStr, new KafkaTSentryPrivilegeConverter(component, service));
+      } else if (isRevokePrivilegeRole) {
+        command = new RevokePrivilegeFromRoleCmd(roleName, component,
+                privilegeStr, new KafkaTSentryPrivilegeConverter(component, service));
+      } else if (isListRole) {
+        command = new ListRolesCmd(groupName, component);
+      } else if (isListPrivilege) {
+        command = new ListPrivilegesByRoleCmd(roleName, component,
+                service, new KafkaTSentryPrivilegeConverter(component, service));
+      }
 
-    // check the requestor name
-    if (StringUtils.isEmpty(requestorName)) {
-      // The exception message will be recorded in log file.
-      throw new Exception("The requestor name is empty.");
-    }
+      // check the requestor name
+      if (StringUtils.isEmpty(requestorName)) {
+        // The exception message will be recorded in log file.
+        throw new Exception("The requestor name is empty.");
+      }
 
-    if (command != null) {
-      command.execute(client, requestorName);
+      if (command != null) {
+        command.execute(client, requestorName);
+      }
     }
   }
 
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java
index 695c008..5385f7d 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java
@@ -47,39 +47,41 @@
     Configuration conf = getSentryConf();
 
     String service = conf.get(SOLR_SERVICE_NAME, "service1");
-    SentryGenericServiceClient client = SentryGenericServiceClientFactory.create(conf);
-    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-    String requestorName = ugi.getShortUserName();
+    try(SentryGenericServiceClient client =
+                SentryGenericServiceClientFactory.create(conf)) {
+      UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+      String requestorName = ugi.getShortUserName();
 
-    if (isCreateRole) {
-      command = new CreateRoleCmd(roleName, component);
-    } else if (isDropRole) {
-      command = new DropRoleCmd(roleName, component);
-    } else if (isAddRoleGroup) {
-      command = new AddRoleToGroupCmd(roleName, groupName, component);
-    } else if (isDeleteRoleGroup) {
-      command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
-    } else if (isGrantPrivilegeRole) {
-      command = new GrantPrivilegeToRoleCmd(roleName, component,
-          privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
-    } else if (isRevokePrivilegeRole) {
-      command = new RevokePrivilegeFromRoleCmd(roleName, component,
-          privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
-    } else if (isListRole) {
-      command = new ListRolesCmd(groupName, component);
-    } else if (isListPrivilege) {
-      command = new ListPrivilegesByRoleCmd(roleName, component,
-          service, new SolrTSentryPrivilegeConverter(component, service));
-    }
+      if (isCreateRole) {
+        command = new CreateRoleCmd(roleName, component);
+      } else if (isDropRole) {
+        command = new DropRoleCmd(roleName, component);
+      } else if (isAddRoleGroup) {
+        command = new AddRoleToGroupCmd(roleName, groupName, component);
+      } else if (isDeleteRoleGroup) {
+        command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
+      } else if (isGrantPrivilegeRole) {
+        command = new GrantPrivilegeToRoleCmd(roleName, component,
+                privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
+      } else if (isRevokePrivilegeRole) {
+        command = new RevokePrivilegeFromRoleCmd(roleName, component,
+                privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
+      } else if (isListRole) {
+        command = new ListRolesCmd(groupName, component);
+      } else if (isListPrivilege) {
+        command = new ListPrivilegesByRoleCmd(roleName, component,
+                service, new SolrTSentryPrivilegeConverter(component, service));
+      }
 
-    // check the requestor name
-    if (StringUtils.isEmpty(requestorName)) {
-      // The exception message will be recorded in log file.
-      throw new Exception("The requestor name is empty.");
-    }
+      // check the requestor name
+      if (StringUtils.isEmpty(requestorName)) {
+        // The exception message will be recorded in log file.
+        throw new Exception("The requestor name is empty.");
+      }
 
-    if (command != null) {
-      command.execute(client, requestorName);
+      if (command != null) {
+        command.execute(client, requestorName);
+      }
     }
   }
 
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
index c2b03e5..fb8036f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
@@ -26,7 +26,7 @@
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
 
-public interface SentryPolicyServiceClient {
+public interface SentryPolicyServiceClient extends AutoCloseable {
 
   void createRole(String requestorUserName, String roleName) throws SentryUserException;
 
@@ -208,8 +208,6 @@
    */
   String getConfigValue(String propertyName, String defaultValue) throws SentryUserException;
 
-  void close();
-
   // Import the sentry mapping data with map structure
   void importPolicy(Map<String, Map<String, Set<String>>> policyFileMappingData,
       String requestorUserName, boolean isOverwriteRole) throws SentryUserException;
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
index d1a4d99..b5b8f82 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
@@ -18,78 +18,62 @@
 
 package org.apache.sentry.provider.db.service.thrift;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Collections;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.sentry.core.common.exception.SentryUserException;
-import org.apache.sentry.core.common.ActiveRoleSet;
-import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryTransportFactory;
-import org.apache.sentry.core.model.db.AccessConstants;
-import org.apache.sentry.core.model.db.DBModelAuthorizable;
-import org.apache.sentry.core.common.utils.PolicyFileConstants;
-import org.apache.sentry.service.thrift.SentryServiceUtil;
-import org.apache.sentry.service.thrift.ServiceConstants;
-import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
-import org.apache.sentry.service.thrift.ServiceConstants.ThriftConstants;
-import org.apache.sentry.service.thrift.Status;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TMultiplexedProtocol;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.core.common.transport.SentryConnection;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.apache.sentry.core.common.transport.TTransportWrapper;
+import org.apache.sentry.core.common.utils.PolicyFileConstants;
+import org.apache.sentry.core.model.db.AccessConstants;
+import org.apache.sentry.core.model.db.DBModelAuthorizable;
+import org.apache.sentry.provider.db.service.thrift.SentryPolicyService.Client;
+import org.apache.sentry.service.thrift.SentryServiceUtil;
+import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
+import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
+import org.apache.sentry.service.thrift.ServiceConstants.ThriftConstants;
+import org.apache.sentry.service.thrift.Status;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TMultiplexedProtocol;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
- * Sentry Policy Service Client
+ * Client implementation for Policy (HMS) clients.
  * <p>
- * The public implementation of SentryPolicyServiceClient.
- * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state
- * So it is important to close and re-open the transportFactory so that new socket is used.
- * When an class is instantiated, there will be transportFactory created connecting with first available
- * server this is configured.
+ * The class is not thread-safe - it is up to the callers to ensure thread safety
  */
-public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyServiceClient, SentryServiceClient {
+public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyServiceClient, SentryConnection {
 
-  private SentryPolicyService.Client client;
-  private SentryTransportFactory transportFactory;
-  private TTransport transport;
-  private Configuration conf;
+  private Client client;
+  private final SentryTransportPool transportPool;
+  private TTransportWrapper transport;
+  private final long maxMessageSize;
 
-  private static final Logger LOGGER = LoggerFactory
-    .getLogger(SentryPolicyServiceClient.class);
   private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred ";
 
   /**
    * Initialize the sentry configurations.
    */
-  public SentryPolicyServiceClientDefaultImpl(Configuration conf, SentryPolicyClientTransportConfig transportConfig)
+  public SentryPolicyServiceClientDefaultImpl(Configuration conf,
+                                              SentryTransportPool transportPool)
     throws IOException {
-    transportFactory = new SentryTransportFactory(conf, transportConfig);
-    this.conf = conf;
-  }
-
-  public SentryPolicyServiceClientDefaultImpl(String addr, int port,
-                                              Configuration conf) throws IOException {
-    transportFactory = new SentryTransportFactory(addr, port, conf,
-      new SentryPolicyClientTransportConfig());
-    this.conf = conf;
-    connect();
+    maxMessageSize = conf.getLong(ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
+            ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+    this.transportPool = transportPool;
   }
 
   /**
@@ -98,24 +82,21 @@
    * @throws IOException
    */
   @Override
-  public synchronized void connect() throws IOException {
-    if (transport != null && transport.isOpen()) {
+  public void connect() throws Exception {
+    if ((transport != null) && transport.isOpen()) {
       return;
     }
 
-    transport = transportFactory.getTransport();
-    long maxMessageSize = conf.getLong(
-      ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
-      ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+    transport = transportPool.getTransport();
     TMultiplexedProtocol protocol = new TMultiplexedProtocol(
-      new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true),
-      SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME);
-    client = new SentryPolicyService.Client(protocol);
-    LOGGER.debug("Successfully created client");
+      new TBinaryProtocol(transport.getTTransport(), maxMessageSize, maxMessageSize,
+              true, true),
+            SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME);
+    client = new Client(protocol);
   }
 
   @Override
-  public synchronized void createRole(String requestorUserName, String roleName)
+  public void createRole(String requestorUserName, String roleName)
     throws SentryUserException {
     TCreateSentryRoleRequest request = new TCreateSentryRoleRequest();
     request.setProtocol_version(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT);
@@ -130,20 +111,20 @@
   }
 
   @Override
-  public synchronized void dropRole(String requestorUserName,
+  public void dropRole(String requestorUserName,
                                     String roleName)
     throws SentryUserException {
     dropRole(requestorUserName, roleName, false);
   }
 
   @Override
-  public synchronized void dropRoleIfExists(String requestorUserName,
+  public void dropRoleIfExists(String requestorUserName,
                                             String roleName)
     throws SentryUserException {
     dropRole(requestorUserName, roleName, true);
   }
 
-  private synchronized void dropRole(String requestorUserName,
+  private void dropRole(String requestorUserName,
                                      String roleName, boolean ifExists)
     throws SentryUserException {
     TDropSentryRoleRequest request = new TDropSentryRoleRequest();
@@ -171,7 +152,7 @@
    * @throws SentryUserException
    */
   @Override
-  public synchronized Set<TSentryRole> listRolesByGroupName(
+  public Set<TSentryRole> listRolesByGroupName(
     String requestorUserName,
     String groupName)
     throws SentryUserException {
@@ -219,7 +200,7 @@
   }
 
   @Override
-  public synchronized Set<TSentryPrivilege> listAllPrivilegesByRoleName(String requestorUserName,
+  public Set<TSentryPrivilege> listAllPrivilegesByRoleName(String requestorUserName,
                                                                         String roleName)
     throws SentryUserException {
     return listPrivilegesByRoleName(requestorUserName, roleName, null);
@@ -235,7 +216,7 @@
    * @throws SentryUserException
    */
   @Override
-  public synchronized Set<TSentryPrivilege> listPrivilegesByRoleName(String requestorUserName,
+  public Set<TSentryPrivilege> listPrivilegesByRoleName(String requestorUserName,
                                                                      String roleName, List<? extends Authorizable> authorizable)
     throws SentryUserException {
     TListSentryPrivilegesRequest request = new TListSentryPrivilegesRequest();
@@ -257,13 +238,13 @@
   }
 
   @Override
-  public synchronized Set<TSentryRole> listRoles(String requestorUserName)
+  public Set<TSentryRole> listRoles(String requestorUserName)
     throws SentryUserException {
     return listRolesByGroupName(requestorUserName, null);
   }
 
   @Override
-  public synchronized Set<TSentryRole> listUserRoles(String requestorUserName)
+  public Set<TSentryRole> listUserRoles(String requestorUserName)
     throws SentryUserException {
     Set<TSentryRole> tSentryRoles = Sets.newHashSet();
     tSentryRoles.addAll(listRolesByGroupName(requestorUserName, AccessConstants.ALL));
@@ -272,7 +253,7 @@
   }
 
   @Override
-  public synchronized TSentryPrivilege grantURIPrivilege(String requestorUserName,
+  public TSentryPrivilege grantURIPrivilege(String requestorUserName,
                                                          String roleName, String server, String uri)
     throws SentryUserException {
     return grantPrivilege(requestorUserName, roleName,
@@ -280,7 +261,7 @@
   }
 
   @Override
-  public synchronized TSentryPrivilege grantURIPrivilege(String requestorUserName,
+  public TSentryPrivilege grantURIPrivilege(String requestorUserName,
                                                          String roleName, String server, String uri, Boolean grantOption)
     throws SentryUserException {
     return grantPrivilege(requestorUserName, roleName,
@@ -288,7 +269,7 @@
   }
 
   @Override
-  public synchronized void grantServerPrivilege(String requestorUserName,
+  public void grantServerPrivilege(String requestorUserName,
                                                 String roleName, String server, String action)
     throws SentryUserException {
 
@@ -307,14 +288,14 @@
    * Should use grantServerPrivilege(String requestorUserName,
    *  String roleName, String server, String action, Boolean grantOption)
    */
-  public synchronized TSentryPrivilege grantServerPrivilege(String requestorUserName,
+  public TSentryPrivilege grantServerPrivilege(String requestorUserName,
                                                             String roleName, String server, Boolean grantOption) throws SentryUserException {
     return grantServerPrivilege(requestorUserName, roleName, server,
       AccessConstants.ALL, grantOption);
   }
 
   @Override
-  public synchronized TSentryPrivilege grantServerPrivilege(String requestorUserName,
+  public TSentryPrivilege grantServerPrivilege(String requestorUserName,
                                                             String roleName, String server, String action, Boolean grantOption)
     throws SentryUserException {
 
@@ -329,7 +310,7 @@
   }
 
   @Override
-  public synchronized TSentryPrivilege grantDatabasePrivilege(String requestorUserName,
+  public TSentryPrivilege grantDatabasePrivilege(String requestorUserName,
                                                               String roleName, String server, String db, String action)
     throws SentryUserException {
     return grantPrivilege(requestorUserName, roleName,
@@ -337,7 +318,7 @@
   }
 
   @Override
-  public synchronized TSentryPrivilege grantDatabasePrivilege(String requestorUserName,
+  public TSentryPrivilege grantDatabasePrivilege(String requestorUserName,
                                                               String roleName, String server, String db, String action, Boolean grantOption)
     throws SentryUserException {
     return grantPrivilege(requestorUserName, roleName,
@@ -345,7 +326,7 @@
   }
 
   @Override
-  public synchronized TSentryPrivilege grantTablePrivilege(String requestorUserName,
+  public TSentryPrivilege grantTablePrivilege(String requestorUserName,
                                                            String roleName, String server, String db, String table, String action)
     throws SentryUserException {
     return grantPrivilege(requestorUserName, roleName, PrivilegeScope.TABLE, server,
@@ -354,7 +335,7 @@
   }
 
   @Override
-  public synchronized TSentryPrivilege grantTablePrivilege(String requestorUserName,
+  public TSentryPrivilege grantTablePrivilege(String requestorUserName,
                                                            String roleName, String server, String db, String table, String action, Boolean grantOption)
     throws SentryUserException {
     return grantPrivilege(requestorUserName, roleName, PrivilegeScope.TABLE, server,
@@ -362,7 +343,7 @@
   }
 
   @Override
-  public synchronized TSentryPrivilege grantColumnPrivilege(String requestorUserName,
+  public TSentryPrivilege grantColumnPrivilege(String requestorUserName,
                                                             String roleName, String server, String db, String table, String columnName, String action)
     throws SentryUserException {
     return grantPrivilege(requestorUserName, roleName, PrivilegeScope.COLUMN, server,
@@ -371,7 +352,7 @@
   }
 
   @Override
-  public synchronized TSentryPrivilege grantColumnPrivilege(String requestorUserName,
+  public TSentryPrivilege grantColumnPrivilege(String requestorUserName,
                                                             String roleName, String server, String db, String table, String columnName, String action, Boolean grantOption)
     throws SentryUserException {
     return grantPrivilege(requestorUserName, roleName, PrivilegeScope.COLUMN, server,
@@ -379,7 +360,7 @@
   }
 
   @Override
-  public synchronized Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName,
+  public Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName,
                                                                    String roleName, String server, String db, String table, List<String> columnNames, String action)
     throws SentryUserException {
     return grantPrivileges(requestorUserName, roleName, PrivilegeScope.COLUMN, server,
@@ -388,7 +369,7 @@
   }
 
   @Override
-  public synchronized Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName,
+  public Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName,
                                                                    String roleName, String server, String db, String table, List<String> columnNames, String action, Boolean grantOption)
     throws SentryUserException {
     return grantPrivileges(requestorUserName, roleName, PrivilegeScope.COLUMN,
@@ -397,14 +378,14 @@
   }
 
   @Override
-  public synchronized Set<TSentryPrivilege> grantPrivileges(
+  public Set<TSentryPrivilege> grantPrivileges(
     String requestorUserName, String roleName,
     Set<TSentryPrivilege> privileges) throws SentryUserException {
     return grantPrivilegesCore(requestorUserName, roleName, privileges);
   }
 
   @Override
-  public synchronized TSentryPrivilege grantPrivilege(String requestorUserName, String roleName,
+  public TSentryPrivilege grantPrivilege(String requestorUserName, String roleName,
                                                       TSentryPrivilege privilege) throws SentryUserException {
     return grantPrivilegeCore(requestorUserName, roleName, privilege);
   }
@@ -500,12 +481,12 @@
   }
 
   @Override
-  public synchronized void revokePrivileges(String requestorUserName, String roleName, Set<TSentryPrivilege> privileges) throws SentryUserException {
+  public void revokePrivileges(String requestorUserName, String roleName, Set<TSentryPrivilege> privileges) throws SentryUserException {
     this.revokePrivilegesCore(requestorUserName, roleName, privileges);
   }
 
   @Override
-  public synchronized void revokePrivilege(String requestorUserName, String roleName, TSentryPrivilege privilege) throws SentryUserException {
+  public void revokePrivilege(String requestorUserName, String roleName, TSentryPrivilege privilege) throws SentryUserException {
     this.revokePrivilegeCore(requestorUserName, roleName, privilege);
 
   }
@@ -530,7 +511,7 @@
   }
 
   @Override
-  public synchronized void revokeURIPrivilege(String requestorUserName,
+  public void revokeURIPrivilege(String requestorUserName,
                                               String roleName, String server, String uri)
     throws SentryUserException {
     revokePrivilege(requestorUserName, roleName,
@@ -538,7 +519,7 @@
   }
 
   @Override
-  public synchronized void revokeURIPrivilege(String requestorUserName,
+  public void revokeURIPrivilege(String requestorUserName,
                                               String roleName, String server, String uri, Boolean grantOption)
     throws SentryUserException {
     revokePrivilege(requestorUserName, roleName,
@@ -546,7 +527,7 @@
   }
 
   @Override
-  public synchronized void revokeServerPrivilege(String requestorUserName,
+  public void revokeServerPrivilege(String requestorUserName,
                                                  String roleName, String server, String action)
     throws SentryUserException {
 
@@ -560,7 +541,7 @@
       PrivilegeScope.SERVER, server, null, null, null, null, action);
   }
 
-  public synchronized void revokeServerPrivilege(String requestorUserName,
+  public void revokeServerPrivilege(String requestorUserName,
                                                  String roleName, String server, String action, Boolean grantOption)
     throws SentryUserException {
 
@@ -580,7 +561,7 @@
    *  String roleName, String server, String action, Boolean grantOption)
    */
   @Override
-  public synchronized void revokeServerPrivilege(String requestorUserName,
+  public void revokeServerPrivilege(String requestorUserName,
                                                  String roleName, String server, boolean grantOption)
     throws SentryUserException {
     revokePrivilege(requestorUserName, roleName,
@@ -588,7 +569,7 @@
   }
 
   @Override
-  public synchronized void revokeDatabasePrivilege(String requestorUserName,
+  public void revokeDatabasePrivilege(String requestorUserName,
                                                    String roleName, String server, String db, String action)
     throws SentryUserException {
     revokePrivilege(requestorUserName, roleName,
@@ -596,7 +577,7 @@
   }
 
   @Override
-  public synchronized void revokeDatabasePrivilege(String requestorUserName,
+  public void revokeDatabasePrivilege(String requestorUserName,
                                                    String roleName, String server, String db, String action, Boolean grantOption)
     throws SentryUserException {
     revokePrivilege(requestorUserName, roleName,
@@ -604,7 +585,7 @@
   }
 
   @Override
-  public synchronized void revokeTablePrivilege(String requestorUserName,
+  public void revokeTablePrivilege(String requestorUserName,
                                                 String roleName, String server, String db, String table, String action)
     throws SentryUserException {
     revokePrivilege(requestorUserName, roleName,
@@ -613,7 +594,7 @@
   }
 
   @Override
-  public synchronized void revokeTablePrivilege(String requestorUserName,
+  public void revokeTablePrivilege(String requestorUserName,
                                                 String roleName, String server, String db, String table, String action, Boolean grantOption)
     throws SentryUserException {
     revokePrivilege(requestorUserName, roleName,
@@ -622,7 +603,7 @@
   }
 
   @Override
-  public synchronized void revokeColumnPrivilege(String requestorUserName, String roleName,
+  public void revokeColumnPrivilege(String requestorUserName, String roleName,
                                                  String server, String db, String table, String columnName, String action)
     throws SentryUserException {
     ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
@@ -633,7 +614,7 @@
   }
 
   @Override
-  public synchronized void revokeColumnPrivilege(String requestorUserName, String roleName,
+  public void revokeColumnPrivilege(String requestorUserName, String roleName,
                                                  String server, String db, String table, String columnName, String action, Boolean grantOption)
     throws SentryUserException {
     ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
@@ -644,7 +625,7 @@
   }
 
   @Override
-  public synchronized void revokeColumnsPrivilege(String requestorUserName, String roleName,
+  public void revokeColumnsPrivilege(String requestorUserName, String roleName,
                                                   String server, String db, String table, List<String> columns, String action)
     throws SentryUserException {
     revokePrivilege(requestorUserName, roleName,
@@ -653,7 +634,7 @@
   }
 
   @Override
-  public synchronized void revokeColumnsPrivilege(String requestorUserName, String roleName,
+  public void revokeColumnsPrivilege(String requestorUserName, String roleName,
                                                   String server, String db, String table, List<String> columns, String action, Boolean grantOption)
     throws SentryUserException {
     revokePrivilege(requestorUserName, roleName,
@@ -741,7 +722,7 @@
   }
 
   @Override
-  public synchronized Set<String> listPrivilegesForProvider
+  public Set<String> listPrivilegesForProvider
     (Set<String> groups, Set<String> users,
      ActiveRoleSet roleSet, Authorizable... authorizable) throws SentryUserException {
     TSentryActiveRoleSet thriftRoleSet = new TSentryActiveRoleSet(roleSet.isAll(), roleSet.getRoles());
@@ -766,21 +747,21 @@
   }
 
   @Override
-  public synchronized void grantRoleToGroup(String requestorUserName,
+  public void grantRoleToGroup(String requestorUserName,
                                             String groupName, String roleName)
     throws SentryUserException {
     grantRoleToGroups(requestorUserName, roleName, Sets.newHashSet(groupName));
   }
 
   @Override
-  public synchronized void revokeRoleFromGroup(String requestorUserName,
+  public void revokeRoleFromGroup(String requestorUserName,
                                                String groupName, String roleName)
     throws SentryUserException {
     revokeRoleFromGroups(requestorUserName, roleName, Sets.newHashSet(groupName));
   }
 
   @Override
-  public synchronized void grantRoleToGroups(String requestorUserName,
+  public void grantRoleToGroups(String requestorUserName,
                                              String roleName, Set<String> groups)
     throws SentryUserException {
     TAlterSentryRoleAddGroupsRequest request = new TAlterSentryRoleAddGroupsRequest(
@@ -795,7 +776,7 @@
   }
 
   @Override
-  public synchronized void revokeRoleFromGroups(String requestorUserName,
+  public void revokeRoleFromGroups(String requestorUserName,
                                                 String roleName, Set<String> groups)
     throws SentryUserException {
     TAlterSentryRoleDeleteGroupsRequest request = new TAlterSentryRoleDeleteGroupsRequest(
@@ -810,19 +791,19 @@
   }
 
   @Override
-  public synchronized void grantRoleToUser(String requestorUserName, String userName,
+  public void grantRoleToUser(String requestorUserName, String userName,
                                            String roleName) throws SentryUserException {
     grantRoleToUsers(requestorUserName, roleName, Sets.newHashSet(userName));
   }
 
   @Override
-  public synchronized void revokeRoleFromUser(String requestorUserName, String userName,
+  public void revokeRoleFromUser(String requestorUserName, String userName,
                                               String roleName) throws SentryUserException {
     revokeRoleFromUsers(requestorUserName, roleName, Sets.newHashSet(userName));
   }
 
   @Override
-  public synchronized void grantRoleToUsers(String requestorUserName, String roleName,
+  public void grantRoleToUsers(String requestorUserName, String roleName,
                                             Set<String> users) throws SentryUserException {
     TAlterSentryRoleAddUsersRequest request = new TAlterSentryRoleAddUsersRequest(
       ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, roleName, users);
@@ -835,7 +816,7 @@
   }
 
   @Override
-  public synchronized void revokeRoleFromUsers(String requestorUserName, String roleName,
+  public void revokeRoleFromUsers(String requestorUserName, String roleName,
                                                Set<String> users) throws SentryUserException {
     TAlterSentryRoleDeleteUsersRequest request = new TAlterSentryRoleDeleteUsersRequest(
       ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, roleName, users);
@@ -858,7 +839,7 @@
   }
 
   @Override
-  public synchronized void dropPrivileges(String requestorUserName,
+  public void dropPrivileges(String requestorUserName,
                                           List<? extends Authorizable> authorizableObjects)
     throws SentryUserException {
     TSentryAuthorizable tSentryAuthorizable = setupSentryAuthorizable(authorizableObjects);
@@ -875,7 +856,7 @@
   }
 
   @Override
-  public synchronized void renamePrivileges(String requestorUserName,
+  public void renamePrivileges(String requestorUserName,
                                             List<? extends Authorizable> oldAuthorizables,
                                             List<? extends Authorizable> newAuthorizables) throws SentryUserException {
     TSentryAuthorizable tOldSentryAuthorizable = setupSentryAuthorizable(oldAuthorizables);
@@ -894,7 +875,7 @@
   }
 
   @Override
-  public synchronized Map<TSentryAuthorizable, TSentryPrivilegeMap> listPrivilegsbyAuthorizable
+  public Map<TSentryAuthorizable, TSentryPrivilegeMap> listPrivilegsbyAuthorizable
     (
       String requestorUserName,
       Set<List<? extends Authorizable>> authorizables, Set<String> groups,
@@ -937,7 +918,7 @@
    */
 
   @Override
-  public synchronized String getConfigValue(String propertyName, String defaultValue)
+  public String getConfigValue(String propertyName, String defaultValue)
     throws SentryUserException {
     TSentryConfigValueRequest request = new TSentryConfigValueRequest(
       ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, propertyName);
@@ -977,7 +958,7 @@
    * @param requestorUserName     The name of the request user
    */
   @Override
-  public synchronized void importPolicy
+  public void importPolicy
   (Map<String, Map<String, Set<String>>> policyFileMappingData,
    String requestorUserName, boolean isOverwriteRole)
     throws SentryUserException {
@@ -1022,7 +1003,7 @@
 
   // export the sentry mapping data with map structure
   @Override
-  public synchronized Map<String, Map<String, Set<String>>> exportPolicy(String
+  public Map<String, Map<String, Set<String>>> exportPolicy(String
                                                                            requestorUserName,
                                                                          String objectPath) throws SentryUserException {
     TSentryExportMappingDataRequest request = new TSentryExportMappingDataRequest(
@@ -1065,12 +1046,23 @@
   }
 
   @Override
-  public synchronized void close() {
-    transportFactory.close();
+  public void close() {
+    done();
   }
 
   @Override
-  public void disconnect() {
-    transportFactory.releaseTransport();
+  public void done() {
+    if (transport != null) {
+      transportPool.returnTransport(transport);
+      transport = null;
+    }
+  }
+
+  @Override
+  public void invalidate() {
+    if (transport != null) {
+      transportPool.invalidateTransport(transport);
+      transport = null;
+    }
   }
 }
\ No newline at end of file
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
index 1d09846..09f17ed 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
@@ -40,36 +40,39 @@
 
   public void run() throws Exception {
     Command command = null;
-    SentryPolicyServiceClient client = SentryServiceClientFactory.create(getSentryConf());
-    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-    String requestorName = ugi.getShortUserName();
 
-    if (isCreateRole) {
-      command = new CreateRoleCmd(roleName);
-    } else if (isDropRole) {
-      command = new DropRoleCmd(roleName);
-    } else if (isAddRoleGroup) {
-      command = new GrantRoleToGroupsCmd(roleName, groupName);
-    } else if (isDeleteRoleGroup) {
-      command = new RevokeRoleFromGroupsCmd(roleName, groupName);
-    } else if (isGrantPrivilegeRole) {
-      command = new GrantPrivilegeToRoleCmd(roleName, privilegeStr);
-    } else if (isRevokePrivilegeRole) {
-      command = new RevokePrivilegeFromRoleCmd(roleName, privilegeStr);
-    } else if (isListRole) {
-      command = new ListRolesCmd(groupName);
-    } else if (isListPrivilege) {
-      command = new ListPrivilegesCmd(roleName);
-    }
+    try(SentryPolicyServiceClient client =
+                SentryServiceClientFactory.create(getSentryConf())) {
+      UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+      String requestorName = ugi.getShortUserName();
 
-    // check the requestor name
-    if (StringUtils.isEmpty(requestorName)) {
-      // The exception message will be recoreded in log file.
-      throw new Exception("The requestor name is empty.");
-    }
+      if (isCreateRole) {
+        command = new CreateRoleCmd(roleName);
+      } else if (isDropRole) {
+        command = new DropRoleCmd(roleName);
+      } else if (isAddRoleGroup) {
+        command = new GrantRoleToGroupsCmd(roleName, groupName);
+      } else if (isDeleteRoleGroup) {
+        command = new RevokeRoleFromGroupsCmd(roleName, groupName);
+      } else if (isGrantPrivilegeRole) {
+        command = new GrantPrivilegeToRoleCmd(roleName, privilegeStr);
+      } else if (isRevokePrivilegeRole) {
+        command = new RevokePrivilegeFromRoleCmd(roleName, privilegeStr);
+      } else if (isListRole) {
+        command = new ListRolesCmd(groupName);
+      } else if (isListPrivilege) {
+        command = new ListPrivilegesCmd(roleName);
+      }
 
-    if (command != null) {
-      command.execute(client, requestorName);
+      // check the requestor name
+      if (StringUtils.isEmpty(requestorName)) {
+        // The exception message will be recoreded in log file.
+        throw new Exception("The requestor name is empty.");
+      }
+
+      if (command != null) {
+        command.execute(client, requestorName);
+      }
     }
   }
 
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
deleted file mode 100644
index acf9b05..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.service.thrift;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-import com.google.common.net.HostAndPort;
-import org.apache.commons.pool2.PooledObjectFactory;
-import org.apache.commons.pool2.impl.AbandonedConfig;
-import org.apache.commons.pool2.impl.GenericObjectPool;
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.core.common.exception.SentryUserException;
-import org.apache.sentry.core.common.transport.SentryClientInvocationHandler;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
-import org.apache.sentry.core.common.utils.ThriftUtil;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The PoolClientInvocationHandler is a proxy class for handling thrift
- * call. For every thrift call, get the instance of
- * SentryPolicyServiceBaseClient from the commons-pool, and return the instance
- * to the commons-pool after complete the call. For any exception with the call,
- * discard the instance and create a new one added to the commons-pool. Then,
- * get the instance and do the call again. For the thread safe, the commons-pool
- * will manage the connection pool, and every thread can get the connection by
- * borrowObject() and return the connection to the pool by returnObject().
- *
- * TODO: Current pool model does not manage the opening connections very well,
- * e.g. opening connections with failed servers should be closed promptly.
- */
-
-public class PoolClientInvocationHandler extends SentryClientInvocationHandler {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(PoolClientInvocationHandler.class);
-
-  private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occurred ";
-
-  private final Configuration conf;
-
-  /**
-   * The configuration to use for our object pools.
-   * Null if we are not using object pools.
-   */
-  private final GenericObjectPoolConfig poolConfig;
-
-  /**
-   * The total number of connection retries to attempt per endpoint.
-   */
-  private final int connectionRetryTotal;
-
-  /**
-   * The configured sentry servers.
-   */
-  private final Endpoint[] endpoints;
-
-  /**
-   * The endpoint which we are currently using.  This can be read without any locks.
-   * It must be written while holding the endpoints lock.
-   */
-  private volatile int freshestEndpointIdx = 0;
-
-  private class Endpoint {
-    /**
-     * The server address or hostname.
-     */
-    private final String addr;
-
-    /**
-     * The server port.
-     */
-    private final int port;
-
-    /**
-     * The server's poolFactory used to create new clients.
-     */
-    private final PooledObjectFactory<SentryPolicyServiceClient> poolFactory;
-
-    /**
-     * The server's pool of cached clients.
-     */
-    private final GenericObjectPool<SentryPolicyServiceClient> pool;
-
-    Endpoint(String addr, int port) {
-      this.addr = addr;
-      this.port = port;
-      this.poolFactory = new SentryServiceClientPoolFactory(addr, port, conf);
-      this.pool = new GenericObjectPool<SentryPolicyServiceClient>(
-          this.poolFactory, poolConfig, new AbandonedConfig());
-    }
-
-    GenericObjectPool<SentryPolicyServiceClient> getPool() {
-      return pool;
-    }
-
-    String getEndPointStr() {
-      return new String("endpoint at [address " + addr + ", port " + port + "]");
-    }
-  }
-
-  public PoolClientInvocationHandler(Configuration conf) throws Exception {
-    this.conf = conf;
-
-    this.poolConfig = new GenericObjectPoolConfig();
-    // config the pool size for commons-pool
-    this.poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL,
-        ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT));
-    this.poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE,
-        ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT));
-    this.poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE,
-        ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT));
-
-    // get the retry number for reconnecting service
-    this.connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL,
-        ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT);
-
-    String hostsAndPortsStr = conf.get(ClientConfig.SERVER_RPC_ADDRESS);
-    if (hostsAndPortsStr == null) {
-      throw new RuntimeException("Config key " +
-          ClientConfig.SERVER_RPC_ADDRESS + " is required");
-    }
-    int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT,
-        ClientConfig.SERVER_RPC_PORT_DEFAULT);
-    String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
-    HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort);
-    this.endpoints = new Endpoint[hostsAndPorts.length];
-    for (int i = 0; i < this.endpoints.length; i++) {
-      this.endpoints[i] = new Endpoint(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort());
-      LOGGER.info("Initiate sentry sever endpoint: hostname " +
-          hostsAndPorts[i].getHostText() + ", port " + hostsAndPorts[i].getPort());
-    }
-  }
-
-  @Override
-  public Object invokeImpl(Object proxy, Method method, Object[] args)
-      throws Exception {
-    int retryCount = 0;
-    /**
-     * The maximum number of retries that we will do.  Each endpoint gets its
-     * own set of retries.
-     */
-    int retryLimit = connectionRetryTotal * endpoints.length;
-
-    /**
-     * The index of the endpoint to use.
-     */
-    int endpointIdx = freshestEndpointIdx;
-
-    /**
-     * A list of exceptions from each endpoint.  This starts as null to avoid
-     * memory allocation in the common case where there is no error.
-     */
-    Exception exc[] = null;
-
-    Object ret = null;
-
-    while (retryCount < retryLimit) {
-      GenericObjectPool<SentryPolicyServiceClient> pool =
-          endpoints[endpointIdx].getPool();
-      try {
-        if ((exc != null) &&
-            (exc[endpointIdx] instanceof TTransportException)) {
-          // If there was a TTransportException last time we tried to contact
-          // this endpoint, attempt to create a new connection before we try
-          // again.
-          synchronized (endpoints) {
-            // If there has room, create new instance and add it to the
-            // commons-pool.  This instance will be returned first from the
-            // commons-pool, because the configuration is LIFO.
-            if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) {
-              pool.addObject();
-            }
-          }
-        }
-        // Try to make the RPC.
-        ret = invokeFromPool(method, args, pool);
-        break;
-      } catch (TTransportException e) {
-        if (exc == null) {
-          exc = new Exception[endpoints.length];
-        }
-        exc[endpointIdx] = e;
-      }
-
-      Exception lastExc = exc[endpointIdx];
-      synchronized (endpoints) {
-        int curFreshestEndpointIdx = freshestEndpointIdx;
-        if (curFreshestEndpointIdx == endpointIdx) {
-          curFreshestEndpointIdx =
-              (curFreshestEndpointIdx  + 1) %  endpoints.length;
-          freshestEndpointIdx = curFreshestEndpointIdx;
-        }
-        endpointIdx = curFreshestEndpointIdx;
-      }
-      // Increase the retry num, and throw the exception if can't retry again.
-      retryCount++;
-      if (retryCount == connectionRetryTotal) {
-        for (int i = 0; i < exc.length; i++) {
-          // Since freshestEndpointIdx is shared by multiple threads, it is possible that
-          // the ith endpoint has been tried in another thread and skipped in the current
-          // thread.
-          if (exc[i] != null) {
-            LOGGER.error("Sentry server " + endpoints[i].getEndPointStr()
-                + " is in unreachable.");
-          }
-        }
-        throw new SentryUserException("Sentry servers are unreachable. " +
-            "Diagnostics is needed for unreachable servers.", lastExc);
-      }
-    }
-    return ret;
-  }
-
-  private Object invokeFromPool(Method method, Object[] args,
-      GenericObjectPool<SentryPolicyServiceClient> pool) throws Exception {
-    Object result = null;
-    SentryPolicyServiceClient client;
-    try {
-      // get the connection from the pool, don't know if the connection is broken.
-      client = pool.borrowObject();
-    } catch (Exception e) {
-      LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
-      // If the exception is caused by connection problem, throw the TTransportException
-      // for reconnect.
-      if (e instanceof IOException) {
-        throw new TTransportException(e);
-      }
-      throw new SentryUserException(e.getMessage(), e);
-    }
-    try {
-      // do the thrift call
-      result = method.invoke(client, args);
-    } catch (InvocationTargetException e) {
-      // Get the target exception, check if SentryUserException or TTransportException is wrapped.
-      // TTransportException or IOException means there has connection problem with the pool.
-      Throwable targetException = e.getCause();
-      if (targetException instanceof SentryUserException) {
-        Throwable sentryTargetException = targetException.getCause();
-        // If there has connection problem, eg, invalid connection if the service restarted,
-        // sentryTargetException instanceof TTransportException or IOException = true.
-        if (sentryTargetException instanceof TTransportException
-            || sentryTargetException instanceof IOException) {
-          // If the exception is caused by connection problem, destroy the instance and
-          // remove it from the commons-pool. Throw the TTransportException for reconnect.
-          pool.invalidateObject(client);
-          throw new TTransportException(sentryTargetException);
-        }
-        // The exception is thrown by thrift call, eg, SentryAccessDeniedException.
-        throw (SentryUserException) targetException;
-      }
-      throw e;
-    } finally{
-      try {
-        // return the instance to commons-pool
-        pool.returnObject(client);
-      } catch (Exception e) {
-        LOGGER.error(POOL_EXCEPTION_MESSAGE, e);
-        throw e;
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public void close() {
-    for (int i = 0; i < endpoints.length; i++) {
-      try {
-        endpoints[i].getPool().close();
-      } catch (Exception e) {
-        LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
-      }
-    }
-  }
-}
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index 9beb07b..ec938da 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -313,7 +313,8 @@
       hmsFollowerExecutor.scheduleAtFixedRate(hmsFollower,
               initDelay, period, TimeUnit.MILLISECONDS);
     } catch (IllegalArgumentException e) {
-      LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms", period), e);
+      LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms",
+              period), e);
       throw e;
     }
   }
@@ -381,7 +382,7 @@
       sentryStoreCleanService.scheduleWithFixedDelay(
               storeCleaner, 0, storeCleanPeriodSecs, TimeUnit.SECONDS);
 
-      LOGGER.info("sentry store cleaner is scheduled with interval %d seconds", storeCleanPeriodSecs);
+      LOGGER.info("sentry store cleaner is scheduled with interval {} seconds", storeCleanPeriodSecs);
     }
     catch(IllegalArgumentException e){
       LOGGER.error("Could not start SentryStoreCleaner due to illegal argument", e);
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
index 7db9310..f3aa587 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
@@ -19,36 +19,120 @@
 package org.apache.sentry.service.thrift;
 
 import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
 import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.ThreadSafe;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.sentry.core.common.utils.SentryConstants.KERBEROS_MODE;
+
+@ThreadSafe
 public final class SentryServiceClientFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientFactory.class);
+
   private static final SentryPolicyClientTransportConfig transportConfig =
           new SentryPolicyClientTransportConfig();
+  private final Configuration conf;
+  private final SentryTransportPool transportPool;
 
-  private SentryServiceClientFactory() {
+  private static final AtomicReference<SentryServiceClientFactory> clientFactory =
+          new AtomicReference<>();
+
+  /**
+   * Create a client instance. The supplied configuration is only used the first time and
+   * ignored afterwords. Tests that want to supply different configurations
+   * should call {@link #factoryReset(SentryServiceClientFactory)} to force new configuration
+   * read.
+   * @param conf Configuration
+   * @return client instance
+   * @throws Exception
+   */
+  public static SentryPolicyServiceClient create(Configuration conf) throws Exception {
+    SentryServiceClientFactory factory = clientFactory.get();
+    if (factory != null) {
+      return factory.create();
+    }
+    factory = new SentryServiceClientFactory(conf);
+    boolean ok = clientFactory.compareAndSet(null, factory);
+    if (ok) {
+      return factory.create();
+    }
+    // Close old factory
+    factory.close();
+    return clientFactory.get().create();
   }
 
-  public static SentryPolicyServiceClient create(Configuration conf) throws Exception {
-    boolean pooled = conf.getBoolean(
-      ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT);
-    if (pooled) {
-      return (SentryPolicyServiceClient) Proxy
-        .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
-          SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
-          new PoolClientInvocationHandler(conf));
-    } else {
-      return (SentryPolicyServiceClient) Proxy
-        .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
-          SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
-          new RetryClientInvocationHandler(conf,
-            new SentryPolicyServiceClientDefaultImpl(conf,transportConfig), transportConfig));
+  private SentryServiceClientFactory(Configuration conf) {
+    Configuration clientConf = conf;
+
+    // When kerberos is enabled,  UserGroupInformation should have been initialized with
+    // HADOOP_SECURITY_AUTHENTICATION property. There are instances where this is not done.
+    // Instead of depending on the callers to update this configuration and to be
+    // sure that UserGroupInformation is properly initialized, sentry client is explicitly
+    // doing it.
+    //
+    // This whole piece of code is a bit ugly but we want to avoid doing this in the transport
+    // code during connection establishment, so we are doing it upfront here instead.
+    boolean useKerberos = transportConfig.isKerberosEnabled(conf);
+
+    if (useKerberos) {
+      LOGGER.info("Using Kerberos authentication");
+      String authMode = conf.get(HADOOP_SECURITY_AUTHENTICATION, "");
+      if (authMode != KERBEROS_MODE) {
+        // Force auth mode to be Kerberos
+        clientConf = new Configuration(conf);
+        clientConf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS_MODE);
+      }
+    }
+
+    this.conf = clientConf;
+
+    boolean useUGI = transportConfig.useUserGroupInformation(conf);
+
+    if (useUGI) {
+      LOGGER.info("Using UserGroupInformation authentication");
+      UserGroupInformation.setConfiguration(this.conf);
+    }
+
+    transportPool = new SentryTransportPool(conf, transportConfig,
+            new SentryTransportFactory(conf, transportConfig));
+  }
+
+  private SentryPolicyServiceClient create() throws Exception {
+    return (SentryPolicyServiceClient) Proxy
+      .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
+        SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
+        new RetryClientInvocationHandler(conf,
+          new SentryPolicyServiceClientDefaultImpl(conf, transportPool), transportConfig));
+  }
+
+  /**
+   * Reset existing factory and return the old one.
+   * Only used by tests.
+   * @param factory new factory to use. May be null.
+   * @return
+   */
+  public static  SentryServiceClientFactory factoryReset(SentryServiceClientFactory factory) {
+    return clientFactory.getAndSet(factory);
+  }
+
+  void close() {
+    try {
+      transportPool.close();
+    } catch (Exception e) {
+      LOGGER.error("failed to close transport pool", e);
     }
   }
 }
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
deleted file mode 100644
index 0164fa6..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import org.apache.commons.pool2.BasePooledObjectFactory;
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SentryServiceClientPoolFactory is for connection pool to manage the object. Implement the related
- * method to create object, destroy object and wrap object.
- */
-
-public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<SentryPolicyServiceClient> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class);
-
-  private final String addr;
-  private final int port;
-  private final Configuration conf;
-
-  public SentryServiceClientPoolFactory(String addr, int port,
-                                        Configuration conf) {
-    this.addr = addr;
-    this.port = port;
-    this.conf = conf;
-  }
-
-  @Override
-  public SentryPolicyServiceClient create() throws Exception {
-    LOGGER.debug("Creating Sentry Service Client...");
-    return new SentryPolicyServiceClientDefaultImpl(addr, port, conf);
-  }
-
-  @Override
-  public PooledObject<SentryPolicyServiceClient> wrap(SentryPolicyServiceClient client) {
-    return new DefaultPooledObject<SentryPolicyServiceClient>(client);
-  }
-
-  @Override
-  public void destroyObject(PooledObject<SentryPolicyServiceClient> pooledObject) {
-    SentryPolicyServiceClient client = pooledObject.getObject();
-    LOGGER.debug("Destroying Sentry Service Client: " + client);
-    if (client != null) {
-      // The close() of TSocket or TSaslClientTransport is called actually, and there has no
-      // exception even there has some problems, eg, the client is closed already.
-      // The close here is just try to close the socket and the client will be destroyed soon.
-      client.close();
-    }
-  }
-}
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
index a4dd8a6..32e67b9 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
@@ -44,6 +44,7 @@
     runTestAsSubject(new TestOperation() {
       @Override
       public void runTestAsSubject() throws Exception {
+        SentryServiceClientFactory oldFactory = SentryServiceClientFactory.factoryReset(null);
         Configuration confWithSmallMaxMsgSize = new Configuration(conf);
         confWithSmallMaxMsgSize.setLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, 20);
         // create a client with a small thrift max message size
@@ -63,6 +64,7 @@
         } finally {
           Assert.assertEquals(true, exceptionThrown);
           clientWithSmallMaxMsgSize.close();
+          SentryServiceClientFactory.factoryReset(oldFactory);
         }
 
         // client can still talk with sentry server when message size is smaller.
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
deleted file mode 100644
index a202775..0000000
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import com.google.common.net.HostAndPort;
-import org.apache.sentry.core.common.utils.ThriftUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestPoolClientInvocationHandler {
-  private static final Logger LOGGER =
-    LoggerFactory.getLogger(TestPoolClientInvocationHandler.class);
-
-  private void expectParseHostPortStrings(String hostsAndPortsStr,
-        String[] expectedHosts, int[] expectedPorts) throws Exception {
-    boolean success = false;
-    String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
-    HostAndPort[] hostsAndPorts;
-    try {
-      hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, 8038);
-      success = true;
-    } finally {
-      if (!success) {
-        LOGGER.error("Caught exception while parsing hosts/ports string " +
-            hostsAndPortsStr);
-      }
-    }
-    String[] hosts = new String[hostsAndPortsStrArr.length];
-    int[] ports = new int[hostsAndPortsStrArr.length];
-    parseHostsAndPorts(hostsAndPorts, hosts, ports);
-    Assert.assertArrayEquals("Got unexpected hosts results while " +
-        "parsing " + hostsAndPortsStr, expectedHosts, hosts);
-    Assert.assertArrayEquals("Got unexpected ports results while " +
-        "parsing " + hostsAndPortsStr, expectedPorts, ports);
-  }
-
-  private void parseHostsAndPorts(HostAndPort[] hostsAndPorts, String[] hosts, int[] ports) {
-    for (int i = 0; i < hostsAndPorts.length; i++) {
-      hosts[i] = hostsAndPorts[i].getHostText();
-      ports[i] = hostsAndPorts[i].getPort();
-    }
-  }
-
-  @SuppressWarnings("PMD.AvoidUsingHardCodedIP")
-  @Test
-  public void testParseHostPortStrings() throws Exception {
-    expectParseHostPortStrings("foo", new String[] {"foo"}, new int[] {8038});
-    expectParseHostPortStrings("foo,bar",
-        new String[] {"foo", "bar"},
-        new int[] {8038, 8038});
-    expectParseHostPortStrings("foo:2020,bar:2021",
-        new String[] {"foo", "bar"},
-        new int[] {2020, 2021});
-    expectParseHostPortStrings("127.0.0.1:2020,127.1.0.1",
-        new String[] {"127.0.0.1", "127.1.0.1"},
-        new int[] {2020, 8038});
-    expectParseHostPortStrings("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:433",
-        new String[] {"2001:db8:85a3:8d3:1319:8a2e:370:7348"},
-        new int[] {433});
-  }
-}
diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
index bead003..7c45999 100644
--- a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
+++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
@@ -28,6 +28,7 @@
 import org.apache.sentry.core.model.kafka.Host;
 import org.apache.sentry.kafka.conf.KafkaAuthConf;
 import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
+import org.apache.sentry.provider.db.generic.UpdatableCache;
 import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
 import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
 import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
@@ -79,8 +80,12 @@
 
   @BeforeClass
   public static void beforeTestEndToEnd() throws Exception {
+    // Stop background update thread
+    UpdatableCache.disable();
     setupConf();
     startSentryServer();
+    // We started a new server, invalidate all connections to the old one
+    SentryGenericServiceClientFactory.factoryReset();
     setUserGroups();
     setAdminPrivilege();
     startKafkaServer();
@@ -88,8 +93,10 @@
 
   @AfterClass
   public static void afterTestEndToEnd() throws Exception {
-    stopSentryServer();
+    // Stop background update thread
+    UpdatableCache.disable();
     stopKafkaServer();
+    stopSentryServer();
   }
 
   private static void stopKafkaServer() {
@@ -170,10 +177,8 @@
   }
 
   public static void setAdminPrivilege() throws Exception {
-    SentryGenericServiceClient sentryClient = null;
-    try {
-      /** grant all privilege to admin user */
-      sentryClient = getSentryClient();
+    try (SentryGenericServiceClient sentryClient = getSentryClient()){
+      // grant all privilege to admin user
       sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT);
       sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP));
       final ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
@@ -184,14 +189,10 @@
       sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT,
           new TSentryPrivilege(COMPONENT, "kafka", authorizables,
               KafkaActionConstant.ALL));
-    } finally {
-      if (sentryClient != null) {
-        sentryClient.close();
-      }
     }
   }
 
-  protected static SentryGenericServiceClient getSentryClient() throws Exception {
+  static SentryGenericServiceClient getSentryClient() throws Exception {
     return SentryGenericServiceClientFactory.create(getClientConfig());
   }
 
diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
index 0b1ef68..6d2cabf 100644
--- a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
+++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
@@ -34,6 +34,7 @@
 import org.apache.sentry.core.model.kafka.Host;
 import org.apache.sentry.core.model.kafka.Topic;
 import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
 import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
 import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
 import org.junit.Assert;
@@ -55,6 +56,8 @@
 
   @Test
   public void testProduceConsumeForSuperuser() {
+    LOGGER.debug("testProduceConsumeForSuperuser");
+    SentryGenericServiceClientFactory.factoryReset();
     try {
       final String SuperuserName = "test";
       testProduce(SuperuserName);
@@ -66,8 +69,11 @@
 
   @Test
   public void testProduceConsumeCycle() throws Exception {
+    LOGGER.debug("testProduceConsumeCycle");
     final String localhost = InetAddress.getLocalHost().getHostAddress();
 
+    // SentryGenericServiceClientFactory.factoryReset();
+
     // START TESTING PRODUCER
     try {
       testProduce("user1");
diff --git a/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java b/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
index 8a01e1c..80f158a 100644
--- a/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
+++ b/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
@@ -197,19 +197,14 @@
   }
 
   public static void setAdminPrivilege() throws Exception {
-    SentryGenericServiceClient sentryClient = null;
-    try {
-      /** grant all privilege to admin user */
-      sentryClient = SentryGenericServiceClientFactory.create(getClientConfig());
+    try (SentryGenericServiceClient sentryClient =
+                 SentryGenericServiceClientFactory.create(getClientConfig())){
+      // grant all privilege to admin user
       sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT);
       sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP));
       sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT,
           new TSentryPrivilege(COMPONENT, SQOOP_SERVER_NAME, new ArrayList<TAuthorizable>(),
               SqoopActionConstant.ALL));
-    } finally {
-      if (sentryClient != null) {
-        sentryClient.close();
-      }
     }
   }