SENTRY-2539:PolicyEngine should be able to return privilege directly (Na Li reviewed by Kalyan kumar kalvagadda)
diff --git a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBinding.java b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBinding.java
index 5c7f84f..3064052 100644
--- a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBinding.java
+++ b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBinding.java
@@ -43,6 +43,7 @@
 import org.apache.sentry.core.model.db.HivePrivilegeModel;
 import org.apache.sentry.core.model.db.Server;
 import org.apache.sentry.policy.common.PolicyEngine;
+import org.apache.sentry.policy.common.PrivilegeFactory;
 import org.apache.sentry.provider.cache.PrivilegeCache;
 import org.apache.sentry.provider.cache.SimpleCacheProviderBackend;
 import org.apache.sentry.provider.common.AuthorizationProvider;
@@ -248,6 +249,10 @@
             HivePrivilegeModel.getInstance()});
   }
 
+  public PrivilegeFactory getPrivilegeFactory() {
+    return authProvider.getPolicyEngine().getPrivilegeFactory();
+  }
+
   // Instantiate the authz provider using PrivilegeCache, this method is used for metadata filter function.
   public static AuthorizationProvider getAuthProviderWithPrivilegeCache(HiveAuthzConf authzConf,
       String serverName, PrivilegeCache privilegeCache) throws Exception {
@@ -289,9 +294,6 @@
 
   /**
    * Validate the privilege for the given operation for the given subject
-   * @param currDB
-   * @param inputEntities
-   * @param outputEntities
    * @param hiveOp
    * @param stmtAuthPrivileges
    * @param subject
diff --git a/sentry-binding/sentry-binding-hive-conf/src/main/java/org/apache/sentry/binding/hive/conf/HiveAuthzConf.java b/sentry-binding/sentry-binding-hive-conf/src/main/java/org/apache/sentry/binding/hive/conf/HiveAuthzConf.java
index 8f45c60..95e6ef1 100644
--- a/sentry-binding/sentry-binding-hive-conf/src/main/java/org/apache/sentry/binding/hive/conf/HiveAuthzConf.java
+++ b/sentry-binding/sentry-binding-hive-conf/src/main/java/org/apache/sentry/binding/hive/conf/HiveAuthzConf.java
@@ -97,6 +97,7 @@
         AUTHZ_PROVIDER_RESOURCE("sentry.hive.provider.resource", ""),
         AUTHZ_PROVIDER_BACKEND("sentry.hive.provider.backend", "org.apache.sentry.provider.file.SimpleFileProviderBackend"),
         AUTHZ_POLICY_ENGINE("sentry.hive.policy.engine", "org.apache.sentry.policy.engine.common.CommonPolicyEngine"),
+        AUTHZ_PRIVILEGE_CACHE("sentry.hive.privilege.cache", "org.apache.sentry.provider.cache.TreePrivilegeCache"),
         AUTHZ_POLICY_FILE_FORMATTER(
                 "sentry.hive.policy.file.formatter",
                 "org.apache.sentry.binding.hive.SentryIniPolicyFileFormatter"),
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBindingHookBase.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBindingHookBase.java
index de88705..a4b664b 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBindingHookBase.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBindingHookBase.java
@@ -53,6 +53,7 @@
 import org.apache.sentry.binding.hive.authz.HiveAuthzPrivileges.HiveOperationScope;
 import org.apache.sentry.binding.hive.authz.HiveAuthzPrivileges.HiveOperationType;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars;
 import org.apache.sentry.core.common.Subject;
 import org.apache.sentry.core.common.exception.SentryGroupNotFoundException;
 import org.apache.sentry.core.common.utils.PathUtils;
@@ -63,13 +64,14 @@
 import org.apache.sentry.core.model.db.DBModelAuthorizable.AuthorizableType;
 import org.apache.sentry.core.model.db.Database;
 import org.apache.sentry.core.model.db.Table;
+import org.apache.sentry.policy.common.PrivilegeFactory;
 import org.apache.sentry.provider.cache.PrivilegeCache;
-import org.apache.sentry.provider.cache.SimplePrivilegeCache;
 import org.apache.sentry.provider.common.AuthorizationProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.lang.reflect.Constructor;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
@@ -87,7 +89,7 @@
   private static final Logger LOG = LoggerFactory
       .getLogger(HiveAuthzBindingHookBase.class);
   protected final HiveAuthzBinding hiveAuthzBinding;
-  protected final HiveAuthzConf authzConf;
+  static HiveAuthzConf authzConf;
   protected Database currDB = Database.ALL;
   protected Table currTab;
   protected List<AccessURI> udfURIs;
@@ -849,7 +851,8 @@
               hiveAuthzBinding.getActiveRoleSet(), hiveAuthzBinding.getAuthServer());
 
       // create PrivilegeCache using user's privileges
-      PrivilegeCache privilegeCache = new SimplePrivilegeCache(userPrivileges);
+      PrivilegeCache privilegeCache = getPrivilegeCache(userPrivileges, hiveAuthzBinding.getPrivilegeFactory());
+
       // create new instance of HiveAuthzBinding whose backend provider should be SimpleCacheProviderBackend
       return new HiveAuthzBinding(HiveAuthzBinding.HiveHook.HiveServer2, hiveAuthzBinding.getHiveConf(),
               hiveAuthzBinding.getAuthzConf(), privilegeCache);
@@ -859,6 +862,33 @@
     }
   }
 
+  private static PrivilegeCache getPrivilegeCache(Set<String> userPrivileges, PrivilegeFactory inPrivilegeFactory) throws Exception {
+    String privilegeCacheName = authzConf.get(AuthzConfVars.AUTHZ_PRIVILEGE_CACHE.getVar(),
+      AuthzConfVars.AUTHZ_PRIVILEGE_CACHE.getDefault());
+
+    LOG.info("Using privilege cache " + privilegeCacheName);
+
+    try {
+      // load the privilege cache class that takes privilege factory as input
+      Constructor<?> cacheConstructor =
+        Class.forName(privilegeCacheName).getDeclaredConstructor(Set.class, PrivilegeFactory.class);
+      if (cacheConstructor != null) {
+        cacheConstructor.setAccessible(true);
+        return (PrivilegeCache) cacheConstructor.
+          newInstance(userPrivileges, inPrivilegeFactory);
+      }
+
+      // load the privilege cache class that does not use privilege factory
+      cacheConstructor = Class.forName(privilegeCacheName).getDeclaredConstructor(Set.class);
+      cacheConstructor.setAccessible(true);
+      return (PrivilegeCache) cacheConstructor.
+        newInstance(userPrivileges);
+    } catch (Exception ex) {
+      LOG.error("Exception at creating privilege cache", ex);
+      throw ex;
+    }
+  }
+
   private static boolean hasPrefixMatch(List<String> prefixList, final String str) {
     for (String prefix : prefixList) {
       if (str.startsWith(prefix)) {
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBindingBase.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBindingBase.java
index 2940a1e..bc7a554 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBindingBase.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBindingBase.java
@@ -19,6 +19,7 @@
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Constructor;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -64,8 +65,8 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import org.apache.sentry.policy.common.PrivilegeFactory;
 import org.apache.sentry.provider.cache.PrivilegeCache;
-import org.apache.sentry.provider.cache.SimplePrivilegeCache;
 import org.apache.sentry.provider.common.AuthorizationProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -141,7 +142,7 @@
 
   private static final Logger LOG = LoggerFactory
       .getLogger(MetastoreAuthzBindingBase.class);
-  private HiveAuthzConf authzConf;
+  private static HiveAuthzConf authzConf;
   private final Server authServer;
   private final HiveConf hiveConf;
   private final ImmutableSet<String> serviceUsers;
@@ -500,7 +501,7 @@
               hiveAuthzBinding.getActiveRoleSet(), hiveAuthzBinding.getAuthServer());
 
       // create PrivilegeCache using user's privileges
-      PrivilegeCache privilegeCache = new SimplePrivilegeCache(userPrivileges);
+      PrivilegeCache privilegeCache = getPrivilegeCache(userPrivileges, hiveAuthzBinding.getPrivilegeFactory());
       // create new instance of HiveAuthzBinding whose backend provider should be SimpleCacheProviderBackend
       return new HiveAuthzBinding(HiveAuthzBinding.HiveHook.HiveMetaStore, hiveAuthzBinding.getHiveConf(),
           hiveAuthzBinding.getAuthzConf(), privilegeCache);
@@ -520,6 +521,34 @@
     }
   }
 
+  protected static PrivilegeCache getPrivilegeCache(Set<String> userPrivileges, PrivilegeFactory inPrivilegeFactory) throws Exception {
+    String privilegeCacheName = authzConf.get(AuthzConfVars.AUTHZ_PRIVILEGE_CACHE.getVar(),
+      AuthzConfVars.AUTHZ_PRIVILEGE_CACHE.getDefault());
+
+    LOG.info("Using privilege cache " + privilegeCacheName);
+
+    try {
+      // load the privilege cache class that takes privilege factory as input
+      Constructor<?> cacheConstructor =
+        Class.forName(privilegeCacheName).getDeclaredConstructor(Set.class, PrivilegeFactory.class);
+      if (cacheConstructor != null) {
+        cacheConstructor.setAccessible(true);
+        return (PrivilegeCache) cacheConstructor.
+          newInstance(userPrivileges, inPrivilegeFactory);
+      }
+
+      // load the privilege cache class that does not use privilege factory
+      cacheConstructor = Class.forName(privilegeCacheName).getDeclaredConstructor(Set.class);
+      cacheConstructor.setAccessible(true);
+      return (PrivilegeCache) cacheConstructor.
+        newInstance(userPrivileges);
+    } catch (Exception ex) {
+      LOG.error("Exception at creating privilege cache", ex);
+      throw ex;
+    }
+  }
+
+
   private String getSdLocation(StorageDescriptor sd) {
     if (sd == null) {
       return "";
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetaStoreFilterHook.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetaStoreFilterHook.java
index 8e09490..3bd8c44 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetaStoreFilterHook.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetaStoreFilterHook.java
@@ -230,6 +230,10 @@
     } catch (Exception e) {
       LOG.warn("Error getting DB list ", e);
       return Collections.emptyList();
+    } finally {
+      // authzBinding.close() is called at end of this block. And privilege cache will be reset.
+      // Clear this field to make sure getting a new binding with cache for next filtering
+      hiveAuthzBinding = null;
     }
   }
 
@@ -266,6 +270,10 @@
     } catch (Exception e) {
       LOG.warn("Error getting Table list ", e);
       return Collections.emptyList();
+    } finally {
+      // authzBinding.close() is called at end of this block. And privilege cache will be reset.
+      // Clear this field to make sure getting a new binding with cache for next filtering
+      hiveAuthzBinding = null;
     }
   }
 
diff --git a/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/privilege/hive/TestCommonPrivilegeForHive.java b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/privilege/hive/TestCommonPrivilegeForHive.java
index 6a8b871..cd57163 100644
--- a/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/privilege/hive/TestCommonPrivilegeForHive.java
+++ b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/privilege/hive/TestCommonPrivilegeForHive.java
@@ -217,6 +217,9 @@
         return null;
       }
 
+      @Override
+      public List<KeyValue> getParts() { return null; }
+
     };
     assertFalse(ROLE_SERVER_SERVER1_DB_ALL.implies(null, hivePrivilegeModel));
     assertFalse(ROLE_SERVER_SERVER1_DB_ALL.implies(p, hivePrivilegeModel));
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/privilege/kafka/TestKafkaWildcardPrivilege.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/privilege/kafka/TestKafkaWildcardPrivilege.java
index 0a0e2f0..5b8e734 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/privilege/kafka/TestKafkaWildcardPrivilege.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/privilege/kafka/TestKafkaWildcardPrivilege.java
@@ -144,6 +144,9 @@
       public List<KeyValue> getAuthorizable() {
         return null;
       }
+
+      @Override
+      public List<KeyValue> getParts() { return null; }
     };
     Privilege topic1 = create(new KeyValue("HOST", "host"), new KeyValue("TOPIC", "topic1"));
     assertFalse(topic1.implies(null, kafkaPrivilegeModel));
diff --git a/sentry-binding/sentry-binding-solr/src/test/java/org/apache/sentry/privilege/solr/TestCommonPrivilegeForSolr.java b/sentry-binding/sentry-binding-solr/src/test/java/org/apache/sentry/privilege/solr/TestCommonPrivilegeForSolr.java
index 6782089..c4d736c 100644
--- a/sentry-binding/sentry-binding-solr/src/test/java/org/apache/sentry/privilege/solr/TestCommonPrivilegeForSolr.java
+++ b/sentry-binding/sentry-binding-solr/src/test/java/org/apache/sentry/privilege/solr/TestCommonPrivilegeForSolr.java
@@ -242,6 +242,9 @@
       public List<KeyValue> getAuthorizable() {
         return null;
       }
+
+      @Override
+      public List<KeyValue> getParts() { return null; }
     };
     Privilege collection1 = create(new KeyValue("collection", "coll1"));
     assertFalse(collection1.implies(null, solrPrivilegeModel));
diff --git a/sentry-binding/sentry-binding-sqoop/src/test/java/org/apache/sentry/privilege/sqoop/TestCommonPrivilegeForSqoop.java b/sentry-binding/sentry-binding-sqoop/src/test/java/org/apache/sentry/privilege/sqoop/TestCommonPrivilegeForSqoop.java
index 94e9919..6b73e95 100644
--- a/sentry-binding/sentry-binding-sqoop/src/test/java/org/apache/sentry/privilege/sqoop/TestCommonPrivilegeForSqoop.java
+++ b/sentry-binding/sentry-binding-sqoop/src/test/java/org/apache/sentry/privilege/sqoop/TestCommonPrivilegeForSqoop.java
@@ -152,6 +152,9 @@
       public List<KeyValue> getAuthorizable() {
         return null;
       }
+
+      @Override
+      public List<KeyValue> getParts() { return null; }
     };
     Privilege job1 = create(new KeyValue("SERVER", "server"), new KeyValue("JOB", "job1"));
     assertFalse(job1.implies(null, sqoopPrivilegeModel));
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/KeyValue.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/KeyValue.java
index b6a1faa..f99a15a 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/KeyValue.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/KeyValue.java
@@ -27,23 +27,33 @@
   private final String value;
 
   public KeyValue(String keyValue) {
-    List<String> kvList = Lists.newArrayList(SentryConstants.KV_SPLITTER.trimResults().limit(2).split(keyValue));
-    if (kvList.size() != 2) {
-      throw new IllegalArgumentException("Invalid key value: " + keyValue + " " + kvList);
+    int splitterIndex = keyValue.indexOf(SentryConstants.KV_SEPARATOR);
+    int splitterIndexEnd = keyValue.lastIndexOf(SentryConstants.KV_SEPARATOR);
+
+    if ((splitterIndex > 0) && (splitterIndex == splitterIndexEnd)) {
+      // optimize for simple case
+      key = keyValue.substring(0, splitterIndex).trim().intern();
+      value = keyValue.substring(splitterIndex + 1).trim().intern();
+    } else {
+      List<String> kvList = Lists.newArrayList(SentryConstants.KV_SPLITTER.trimResults().limit(2).split(keyValue));
+      if (kvList.size() != 2) {
+        throw new IllegalArgumentException("Invalid key value: " + keyValue + " " + kvList);
+      }
+      key = kvList.get(0);
+      value = kvList.get(1);
     }
-    key = kvList.get(0);
-    value = kvList.get(1);
+
     if (key.isEmpty()) {
-      throw new IllegalArgumentException("kvList=[" + kvList + "] for keyValue[" + keyValue + "], Key cannot be empty");
+      throw new IllegalArgumentException("For keyValue: " + keyValue + ", Key cannot be empty");
     } else if (value.isEmpty()) {
-      throw new IllegalArgumentException("kvList=[" + kvList + "] for keyValue[" + keyValue + "], Value cannot be empty");
+      throw new IllegalArgumentException("For keyValue: " + keyValue + ", Value cannot be empty");
     }
   }
 
   public KeyValue(String key, String value) {
     super();
-    this.key = key;
-    this.value = value;
+    this.key = key.intern();
+    this.value = value.intern();
   }
 
   public String getKey() {
diff --git a/sentry-core/sentry-core-model-db/src/main/java/org/apache/sentry/core/model/db/DBModelAuthorizables.java b/sentry-core/sentry-core-model-db/src/main/java/org/apache/sentry/core/model/db/DBModelAuthorizables.java
index 7bc94c9..f768f21 100644
--- a/sentry-core/sentry-core-model-db/src/main/java/org/apache/sentry/core/model/db/DBModelAuthorizables.java
+++ b/sentry-core/sentry-core-model-db/src/main/java/org/apache/sentry/core/model/db/DBModelAuthorizables.java
@@ -37,6 +37,7 @@
         }
       }
     }
+
     return null;
   }
   public static DBModelAuthorizable from(String s) {
diff --git a/sentry-core/sentry-core-model-db/src/main/java/org/apache/sentry/core/model/db/validator/AbstractDBPrivilegeValidator.java b/sentry-core/sentry-core-model-db/src/main/java/org/apache/sentry/core/model/db/validator/AbstractDBPrivilegeValidator.java
index fa28716..2b8ce87 100644
--- a/sentry-core/sentry-core-model-db/src/main/java/org/apache/sentry/core/model/db/validator/AbstractDBPrivilegeValidator.java
+++ b/sentry-core/sentry-core-model-db/src/main/java/org/apache/sentry/core/model/db/validator/AbstractDBPrivilegeValidator.java
@@ -47,5 +47,4 @@
     }
     return result;
   }
-
 }
diff --git a/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/CommonPrivilege.java b/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/CommonPrivilege.java
index 5b261e3..60eb45f 100644
--- a/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/CommonPrivilege.java
+++ b/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/CommonPrivilege.java
@@ -216,6 +216,7 @@
     return SentryConstants.AUTHORIZABLE_JOINER.join(parts);
   }
 
+  @Override
   public List<KeyValue> getParts() {
     return parts;
   }
diff --git a/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/PolicyEngine.java b/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/PolicyEngine.java
index 504b5ea..9f296d1 100644
--- a/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/PolicyEngine.java
+++ b/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/PolicyEngine.java
@@ -80,7 +80,7 @@
       throws SentryConfigurationException;
 
   /**
-   * Get privileges associated with groups and users. Returns Strings which can be resolved by the
+   * Get privileges in string associated with groups and users. Returns Strings which can be resolved by the
    * caller. Strings are returned to separate the PolicyFile class from the type of privileges used
    * in a policy file. Additionally it is possible further processing of the privileges is needed
    * before resolving to a privilege object.
@@ -94,6 +94,20 @@
   ImmutableSet<String> getPrivileges(Set<String> groups, Set<String> users, ActiveRoleSet roleSet,
       Authorizable... authorizableHierarchy) throws SentryConfigurationException;
 
+  /**
+   * Get privilege objects associated with groups and users. Returns Strings which can be resolved by the
+   * caller.
+   *
+   * @param groups
+   * @param users
+   * @param roleSet
+   * @param authorizableHierarchy
+   * @return
+   * @throws SentryConfigurationException
+   */
+  ImmutableSet<Privilege> getPrivilegeObjects(Set<String> groups, Set<String> users,
+      ActiveRoleSet roleSet, Authorizable... authorizableHierarchy) throws SentryConfigurationException;
+
   void close();
 
   void validatePolicy(boolean strictValidation) throws SentryConfigurationException;
diff --git a/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/Privilege.java b/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/Privilege.java
index 6c2737a..e60feab 100644
--- a/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/Privilege.java
+++ b/sentry-policy/sentry-policy-common/src/main/java/org/apache/sentry/policy/common/Privilege.java
@@ -28,6 +28,11 @@
    **/
   boolean implies(Privilege p, Model model);
 
+  /**
+   * Return the list of parts of the privilege
+   * @return List of parts of the privilege
+   */
+  List<KeyValue> getParts();
 
   /**
    * Return the list of authorizeable of the privilege.
diff --git a/sentry-policy/sentry-policy-engine/src/main/java/org/apache/sentry/policy/engine/common/CommonPolicyEngine.java b/sentry-policy/sentry-policy-engine/src/main/java/org/apache/sentry/policy/engine/common/CommonPolicyEngine.java
index a819bb0..ab2f4f3 100644
--- a/sentry-policy/sentry-policy-engine/src/main/java/org/apache/sentry/policy/engine/common/CommonPolicyEngine.java
+++ b/sentry-policy/sentry-policy-engine/src/main/java/org/apache/sentry/policy/engine/common/CommonPolicyEngine.java
@@ -21,6 +21,7 @@
 import org.apache.sentry.core.common.Authorizable;
 import org.apache.sentry.core.common.exception.SentryConfigurationException;
 import org.apache.sentry.policy.common.PolicyEngine;
+import org.apache.sentry.policy.common.Privilege;
 import org.apache.sentry.policy.common.PrivilegeFactory;
 import org.apache.sentry.provider.common.ProviderBackend;
 import org.slf4j.Logger;
@@ -93,6 +94,20 @@
   }
 
   @Override
+  public ImmutableSet<Privilege> getPrivilegeObjects(Set<String> groups, Set<String> users,
+      ActiveRoleSet roleSet, Authorizable... authorizableHierarchy)
+      throws SentryConfigurationException {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Getting permissions for groups: {}, users: {}", groups, users);
+    }
+    ImmutableSet<Privilege> result = providerBackend.getPrivilegeObjects(groups, users, roleSet, authorizableHierarchy);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("result = " + result);
+    }
+    return result;
+  }
+
+  @Override
   public void validatePolicy(boolean strictValidation) throws SentryConfigurationException {
     this.providerBackend.validatePolicy(strictValidation);
   }
diff --git a/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/FilteredPrivilegeCache.java b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/FilteredPrivilegeCache.java
new file mode 100644
index 0000000..d592b4f
--- /dev/null
+++ b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/FilteredPrivilegeCache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.cache;
+
+import java.util.Set;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.policy.common.Privilege;
+
+/**
+ * The cache returns privileges based on the input authorizable hierarchy. This filtering
+ * reduces the number of returned privileges for authorization check to improve performance
+ */
+public interface FilteredPrivilegeCache extends PrivilegeCache {
+
+  /**
+   * Get the privileges in string for the give set of groups and users with the give active
+   * roles and authorization hierarchy from the cache.
+   */
+  Set<String> listPrivileges(Set<String> groups, Set<String> users, ActiveRoleSet roleSet,
+    Authorizable... authorizationhierarchy);
+
+  /**
+   * Get the privilege objects for the give set of groups and users with the give active
+   * roles and authorization hierarchy from the cache.
+   */
+  Set<Privilege> listPrivilegeObjects(Set<String> groups, Set<String> users, ActiveRoleSet roleSet,
+    Authorizable... authorizationhierarchy);
+}
diff --git a/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/PrivilegeCache.java b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/PrivilegeCache.java
index 4bb6d32..1f8f229 100644
--- a/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/PrivilegeCache.java
+++ b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/PrivilegeCache.java
@@ -20,7 +20,6 @@
 import java.util.Set;
 
 import org.apache.sentry.core.common.ActiveRoleSet;
-import org.apache.sentry.core.common.Authorizable;
 
 public interface PrivilegeCache {
   /**
@@ -39,11 +38,5 @@
   Set<String> listPrivileges(Set<String> groups, Set<String> users,
       ActiveRoleSet roleSet);
 
-  /**
-   * Get the privileges for the give set of groups and users with the give active
-   * roles and authorization hierarchy from the cache.
-   */
-  Set<String> listPrivileges(Set<String> groups, Set<String> users, ActiveRoleSet roleSet,
-      Authorizable... authorizationhierarchy);
   void close();
 }
diff --git a/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimpleCacheProviderBackend.java b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimpleCacheProviderBackend.java
index ddb4ec5..408f447 100644
--- a/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimpleCacheProviderBackend.java
+++ b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimpleCacheProviderBackend.java
@@ -23,6 +23,7 @@
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
 import org.apache.sentry.core.common.exception.SentryConfigurationException;
+import org.apache.sentry.policy.common.Privilege;
 import org.apache.sentry.provider.common.ProviderBackend;
 import org.apache.sentry.provider.common.ProviderBackendContext;
 
@@ -72,8 +73,34 @@
       throw new IllegalStateException(
           "Backend has not been properly initialized");
     }
-    return ImmutableSet.copyOf(cacheHandle.listPrivileges(groups, users,
+
+    if (cacheHandle instanceof FilteredPrivilegeCache) {
+      FilteredPrivilegeCache filteredPrivilegeCache = (FilteredPrivilegeCache)cacheHandle;
+      return ImmutableSet.copyOf(filteredPrivilegeCache.listPrivileges(groups, users,
         roleSet, authorizableHierarchy));
+    }
+
+    return ImmutableSet.copyOf(cacheHandle.listPrivileges(groups,
+      roleSet));
+  }
+
+  @Override
+  public ImmutableSet<Privilege> getPrivilegeObjects(Set<String> groups, Set<String> users,
+      ActiveRoleSet roleSet, Authorizable... authorizableHierarchy) {
+    if (!initialized()) {
+      throw new IllegalStateException(
+          "Backend has not been properly initialized");
+    }
+
+    if (cacheHandle instanceof FilteredPrivilegeCache) {
+      FilteredPrivilegeCache filteredPrivilegeCache = (FilteredPrivilegeCache) cacheHandle;
+      return ImmutableSet.copyOf(filteredPrivilegeCache.listPrivilegeObjects(groups, users,
+        roleSet, authorizableHierarchy));
+    }
+
+    // cacheHandle does not support this function. The caller should call getPrivileges()
+    // to get privileges
+    return ImmutableSet.of();
   }
 
   @Override
diff --git a/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimpleFilteredPrivilegeCache.java b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimpleFilteredPrivilegeCache.java
new file mode 100644
index 0000000..48711af
--- /dev/null
+++ b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimpleFilteredPrivilegeCache.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.cache;
+
+import java.util.Collections;
+import java.util.stream.Collectors;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.utils.SentryConstants;
+import org.apache.sentry.core.common.utils.KeyValue;
+import org.apache.sentry.core.model.db.DBModelAuthorizable.AuthorizableType;
+import org.apache.sentry.policy.common.CommonPrivilege;
+import org.apache.sentry.policy.common.Privilege;
+import org.apache.sentry.policy.common.PrivilegeFactory;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.LinkedList;
+
+/*
+ * The class is used for saving and getting user's privileges when do the hive command like "show tables".
+ * This will enhance the performance for the hive metadata filter. This class is not thread safe.
+ */
+public class SimpleFilteredPrivilegeCache implements FilteredPrivilegeCache {
+
+  private Set<String> cachedPrivileges;
+  private final PrivilegeFactory privilegeFactory;
+
+  // <Authorizable, Set<PrivilegeObject>> map, this is a cache for mapping authorizable
+  // to corresponding set of privilege objects.
+  // e.g. (server=server1->database=b1, (server=server1->database=b1->action=insert))
+  private final Map<String, Set<String>> cachedAuthzPrivileges = new HashMap<>();
+
+  // <AuthorizableType, Set<AuthorizableValue>> wild card map
+  private final Map<String, Set<String>> wildCardAuthz = new HashMap<>();
+
+  public SimpleFilteredPrivilegeCache(Set<String> cachedPrivileges, PrivilegeFactory inPrivilegeFactory) {
+    this.cachedPrivileges = cachedPrivileges;
+    this.privilegeFactory = inPrivilegeFactory;
+
+    for (String cachedPrivilege : cachedPrivileges) {
+      Privilege privilege = getPrivilegeObject(cachedPrivilege);
+      List<KeyValue> authorizable = privilege.getAuthorizable();
+      String authzString = getAuthzString(authorizable);
+      updateWildCardAuthzMap(authorizable);
+
+      Set<String> authzPrivileges = cachedAuthzPrivileges.get(authzString);
+      if (authzPrivileges == null) {
+        authzPrivileges = new HashSet();
+        cachedAuthzPrivileges.put(authzString, authzPrivileges);
+      }
+      authzPrivileges.add(cachedPrivilege);
+    }
+  }
+
+  private String getAuthzString(List<KeyValue> authoriable) {
+    List<KeyValue> authz = new LinkedList<>();
+    for (KeyValue auth : authoriable) {
+
+      // For authorizable e.g. sever=server1->uri=hdfs://namenode:8020/path/,
+      // use sever=server1 as the key of cachedAuthzPrivileges, since
+      // cannot do string matchinf on URI paths.
+      if (!AuthorizableType.URI.toString().equalsIgnoreCase(auth.getKey())) {
+        authz.add(auth);
+      }
+    }
+
+    return SentryConstants.AUTHORIZABLE_JOINER.join(authz);
+  }
+
+  private void updateWildCardAuthzMap(List<KeyValue> authz) {
+    for (KeyValue auth : authz) {
+      String authKey = auth.getKey().toLowerCase();
+      String authValue = auth.getValue().toLowerCase();
+      Set<String> authzValue = wildCardAuthz.get(authKey);
+
+      if (authzValue != null ) {
+        if (!authzValue.contains(authValue)) {
+          authzValue.add(authValue);
+        }
+      } else {
+        authzValue = new HashSet<>();
+        authzValue.add(authValue);
+        wildCardAuthz.put(authKey, authzValue);
+      }
+    }
+  }
+
+  // return the cached privileges
+  @Override
+  public Set<String> listPrivileges(Set<String> groups, ActiveRoleSet roleSet) {
+    if (cachedPrivileges == null) {
+      cachedPrivileges = new HashSet<String>();
+    }
+    return cachedPrivileges;
+  }
+
+  @Override
+  public void close() {
+    if (cachedPrivileges != null) {
+      cachedPrivileges = Collections.emptySet();
+    }
+  }
+
+  @Override
+  public Set<String> listPrivileges(Set<String> groups, Set<String> users, ActiveRoleSet roleSet) {
+    if (cachedPrivileges == null) {
+      cachedPrivileges = new HashSet<String>();
+    }
+    return cachedPrivileges;
+  }
+
+  @Override
+  public Set<String> listPrivileges(Set<String> groups, Set<String> users, ActiveRoleSet roleSet,
+    Authorizable... authorizationHierarchy) {
+    Set<String> privileges = new HashSet<>();
+    Set<StringBuilder> authzKeys = getAuthzKeys(authorizationHierarchy);
+    for (StringBuilder authzKey : authzKeys) {
+      if (cachedAuthzPrivileges.get(authzKey.toString()) != null) {
+        privileges.addAll(cachedAuthzPrivileges.get(authzKey.toString()));
+      }
+    }
+
+    return privileges;
+  }
+
+  @Override
+  public Set<Privilege> listPrivilegeObjects(Set<String> groups, Set<String> users, ActiveRoleSet roleSet,
+    Authorizable... authorizationHierarchy) {
+    Set<String> privilegeStrings = listPrivileges(groups, users, roleSet, authorizationHierarchy);
+
+    return privilegeStrings.stream()
+      .filter(priString -> priString != null)
+      .map(priString -> getPrivilegeObject(priString))
+      .collect(Collectors.toSet());
+  }
+
+  private Privilege getPrivilegeObject(String priString) {
+    if (privilegeFactory != null) {
+      return privilegeFactory.createPrivilege(priString);
+    }
+
+    return new CommonPrivilege(priString);
+  }
+
+  /**
+   * Get authoriables from the <Authorizable, Set<PrivilegeObject>> cache map,
+   * based on the authorizable hierarchy. This logic follows Privilege.implies.
+   * e.g. given authorizable hierarchy:server=server1->db=db1, returns matched
+   * privileges including server=server1;server=*;server=server1->db=db1;server=server1->db=*.
+   * @param authorizationHierarchy
+   * @return
+   */
+  private Set<StringBuilder> getAuthzKeys(Authorizable... authorizationHierarchy) {
+    Set<StringBuilder> targets = new HashSet<>();
+    for (Authorizable auth : authorizationHierarchy) {
+      String authzType = auth.getTypeName().toLowerCase();
+      String authzName = auth.getName().toLowerCase();
+
+      // No op for URI authorizable type.
+      if (authzType.equalsIgnoreCase(AuthorizableType.URI.toString())) {
+        continue;
+      }
+      // If authorizable name is a wild card, need to add all possible authorizable objects
+      // basesd on the authorizable type.
+      if (authzName.equals(SentryConstants.RESOURCE_WILDCARD_VALUE) ||
+        authzName.equals(SentryConstants.RESOURCE_WILDCARD_VALUE_SOME)||
+        authzName.equals(SentryConstants.RESOURCE_WILDCARD_VALUE_ALL)) {
+        Set<String> wildcardValues = wildCardAuthz.get(authzType);
+
+        if (wildcardValues != null && wildcardValues.size() > 0) {
+          Set<StringBuilder> newTargets = new HashSet<>(targets);
+          for (StringBuilder target : targets) {
+            for (String wildcardValue : wildcardValues) {
+              newTargets.add(addAuthz(target, authzType, wildcardValue));
+            }
+          }
+
+          targets = newTargets;
+        } else {
+          return targets;
+        }
+      } else {
+        if (targets.isEmpty()) {
+          targets.add(addAuthz(new StringBuilder(), authzType, authzName));
+
+          // Add wild card * search, e.g server=*, server=ALL
+          targets.add(addAuthz(new StringBuilder(), authzType,
+            SentryConstants.RESOURCE_WILDCARD_VALUE.toLowerCase()));
+          targets.add(addAuthz(new StringBuilder(), authzType,
+            SentryConstants.RESOURCE_WILDCARD_VALUE_ALL.toLowerCase()));
+        } else {
+          Set<StringBuilder> newTargets = new HashSet<>(targets);
+
+          for (StringBuilder target : targets) {
+            newTargets.add(addAuthz(target, authzType, authzName));
+
+            // Add wild card * search, e.g server=server1->db=*, server=server1->db=ALL
+            newTargets.add(addAuthz(target, authzType,
+              SentryConstants.RESOURCE_WILDCARD_VALUE.toLowerCase()));
+            newTargets.add(addAuthz(target, authzType,
+              SentryConstants.RESOURCE_WILDCARD_VALUE_ALL.toLowerCase()));
+          }
+
+          targets = newTargets;
+        }
+      }
+    }
+
+    return targets;
+  }
+
+  private StringBuilder addAuthz(StringBuilder authorizable, String authzType, String authzName) {
+    StringBuilder newAuthrizable = new StringBuilder(authorizable);
+    if (newAuthrizable.length() > 0) {
+      newAuthrizable.append(SentryConstants.AUTHORIZABLE_SEPARATOR);
+      newAuthrizable.append(SentryConstants.KV_JOINER.join(authzType, authzName));
+    } else {
+      newAuthrizable.append(SentryConstants.KV_JOINER.join(authzType, authzName));
+    }
+
+    return newAuthrizable;
+  }
+}
diff --git a/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimplePrivilegeCache.java b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimplePrivilegeCache.java
index 5de3135..ad2b2ec 100644
--- a/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimplePrivilegeCache.java
+++ b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/SimplePrivilegeCache.java
@@ -18,85 +18,20 @@
 
 import java.util.Collections;
 import org.apache.sentry.core.common.ActiveRoleSet;
-import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.utils.SentryConstants;
-import org.apache.sentry.core.common.utils.KeyValue;
-import org.apache.sentry.core.model.db.DBModelAuthorizable.AuthorizableType;
-import org.apache.sentry.policy.common.CommonPrivilege;
-import org.apache.sentry.policy.common.Privilege;
 
-import java.util.Set;
 import java.util.HashSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.LinkedList;
+import java.util.Set;
 
 /*
  * The class is used for saving and getting user's privileges when do the hive command like "show tables".
- * This will enhance the performance for the hive metadata filter. This class is not thread safe.
+ * This will enhance the performance for the hive metadata filter.
  */
 public class SimplePrivilegeCache implements PrivilegeCache {
 
   private Set<String> cachedPrivileges;
 
-  // <Authorizable, Set<PrivilegeObject>> map, this is a cache for mapping authorizable
-  // to corresponding set of privilege objects.
-  // e.g. (server=server1->database=b1, (server=server1->database=b1->action=insert))
-  private final Map<String, Set<String>> cachedAuthzPrivileges = new HashMap<>();
-
-  // <AuthorizableType, Set<AuthorizableValue>> wild card map
-  private final Map<String, Set<String>> wildCardAuthz = new HashMap<>();
-
   public SimplePrivilegeCache(Set<String> cachedPrivileges) {
     this.cachedPrivileges = cachedPrivileges;
-
-    for (String cachedPrivilege : cachedPrivileges) {
-      Privilege privilege = new CommonPrivilege(cachedPrivilege);
-      List<KeyValue> authorizable = privilege.getAuthorizable();
-      String authzString = getAuthzString(authorizable);
-      updateWildCardAuthzMap(authorizable);
-
-      Set<String> authzPrivileges = cachedAuthzPrivileges.get(authzString);
-      if (authzPrivileges == null) {
-        authzPrivileges = new HashSet();
-        cachedAuthzPrivileges.put(authzString, authzPrivileges);
-      }
-      authzPrivileges.add(cachedPrivilege);
-    }
-  }
-
-  private String getAuthzString(List<KeyValue> authoriable) {
-    List<KeyValue> authz = new LinkedList<>();
-    for (KeyValue auth : authoriable) {
-
-      // For authorizable e.g. sever=server1->uri=hdfs://namenode:8020/path/,
-      // use sever=server1 as the key of cachedAuthzPrivileges, since
-      // cannot do string matchinf on URI paths.
-      if (!AuthorizableType.URI.toString().equalsIgnoreCase(auth.getKey())) {
-        authz.add(auth);
-      }
-    }
-
-    return SentryConstants.AUTHORIZABLE_JOINER.join(authz);
-  }
-
-  private void updateWildCardAuthzMap(List<KeyValue> authz) {
-    for (KeyValue auth : authz) {
-      String authKey = auth.getKey().toLowerCase();
-      String authValue = auth.getValue().toLowerCase();
-      Set<String> authzValue = wildCardAuthz.get(authKey);
-
-      if (authzValue != null ) {
-        if (!authzValue.contains(authValue)) {
-          authzValue.add(authValue);
-        }
-      } else {
-        authzValue = new HashSet<>();
-        authzValue.add(authValue);
-        wildCardAuthz.put(authKey, authzValue);
-      }
-    }
   }
 
   // return the cached privileges
@@ -122,97 +57,4 @@
     }
     return cachedPrivileges;
   }
-
-  @Override
-  public Set<String> listPrivileges(Set<String> groups, Set<String> users, ActiveRoleSet roleSet,
-      Authorizable... authorizationHierarchy) {
-    Set<String> privileges = new HashSet<>();
-    Set<StringBuilder> authzKeys = getAuthzKeys(authorizationHierarchy);
-    for (StringBuilder authzKey : authzKeys) {
-      if (cachedAuthzPrivileges.get(authzKey.toString()) != null) {
-        privileges.addAll(cachedAuthzPrivileges.get(authzKey.toString()));
-      }
-    }
-
-    return privileges;
-  }
-
-  /**
-   * Get authoriables from the <Authorizable, Set<PrivilegeObject>> cache map,
-   * based on the authorizable hierarchy. This logic follows Privilege.implies.
-   * e.g. given authorizable hierarchy:server=server1->db=db1, returns matched
-   * privileges including server=server1;server=*;server=server1->db=db1;server=server1->db=*.
-   * @param authorizationHierarchy
-   * @return
-   */
-  private Set<StringBuilder> getAuthzKeys(Authorizable... authorizationHierarchy) {
-    Set<StringBuilder> targets = new HashSet<>();
-    for (Authorizable auth : authorizationHierarchy) {
-      String authzType = auth.getTypeName().toLowerCase();
-      String authzName = auth.getName().toLowerCase();
-
-      // No op for URI authorizable type.
-      if (authzType.equalsIgnoreCase(AuthorizableType.URI.toString())) {
-        continue;
-      }
-      // If authorizable name is a wild card, need to add all possible authorizable objects
-      // basesd on the authorizable type.
-      if (authzName.equals(SentryConstants.RESOURCE_WILDCARD_VALUE) ||
-          authzName.equals(SentryConstants.RESOURCE_WILDCARD_VALUE_SOME)||
-          authzName.equals(SentryConstants.RESOURCE_WILDCARD_VALUE_ALL)) {
-        Set<String> wildcardValues = wildCardAuthz.get(authzType);
-
-        if (wildcardValues != null && wildcardValues.size() > 0) {
-          Set<StringBuilder> newTargets = new HashSet<>(targets);
-          for (StringBuilder target : targets) {
-            for (String wildcardValue : wildcardValues) {
-              newTargets.add(addAuthz(target, authzType, wildcardValue));
-            }
-          }
-
-          targets = newTargets;
-        } else {
-          return targets;
-        }
-      } else {
-        if (targets.isEmpty()) {
-          targets.add(addAuthz(new StringBuilder(), authzType, authzName));
-
-          // Add wild card * search, e.g server=*, server=ALL
-          targets.add(addAuthz(new StringBuilder(), authzType,
-              SentryConstants.RESOURCE_WILDCARD_VALUE.toLowerCase()));
-          targets.add(addAuthz(new StringBuilder(), authzType,
-              SentryConstants.RESOURCE_WILDCARD_VALUE_ALL.toLowerCase()));
-        } else {
-          Set<StringBuilder> newTargets = new HashSet<>(targets);
-
-          for (StringBuilder target : targets) {
-            newTargets.add(addAuthz(target, authzType, authzName));
-
-            // Add wild card * search, e.g server=server1->db=*, server=server1->db=ALL
-            newTargets.add(addAuthz(target, authzType,
-                SentryConstants.RESOURCE_WILDCARD_VALUE.toLowerCase()));
-            newTargets.add(addAuthz(target, authzType,
-                SentryConstants.RESOURCE_WILDCARD_VALUE_ALL.toLowerCase()));
-          }
-
-          targets = newTargets;
-        }
-      }
-    }
-
-    return targets;
-  }
-
-  private StringBuilder addAuthz(StringBuilder authorizable, String authzType, String authzName) {
-    StringBuilder newAuthrizable = new StringBuilder(authorizable);
-    if (newAuthrizable.length() > 0) {
-      newAuthrizable.append(SentryConstants.AUTHORIZABLE_SEPARATOR);
-      newAuthrizable.append(SentryConstants.KV_JOINER.join(authzType, authzName));
-    } else {
-      newAuthrizable.append(SentryConstants.KV_JOINER.join(authzType, authzName));
-    }
-
-    return newAuthrizable;
-  }
-}
+}
\ No newline at end of file
diff --git a/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/TreePrivilegeCache.java b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/TreePrivilegeCache.java
new file mode 100644
index 0000000..a044145
--- /dev/null
+++ b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/TreePrivilegeCache.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.cache;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.utils.SentryConstants;
+import org.apache.sentry.policy.common.CommonPrivilege;
+import org.apache.sentry.policy.common.Privilege;
+import org.apache.sentry.policy.common.PrivilegeFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * This class is used to cache user's privileges at the hive command such as "show tables".
+ * This cache enhances the performance for the hive metadata filter. This class is not thread safe.
+ * It keeps the privilege objects and organizes them based on their hierarchy
+ */
+public class TreePrivilegeCache implements FilteredPrivilegeCache {
+
+  private final Set<String> cachedPrivileges;
+  private final PrivilegeFactory privilegeFactory;
+  private final Map<String, TreePrivilegeNode> cachedPrivilegeMap;
+
+  private static final Logger LOGGER = LoggerFactory
+    .getLogger(TreePrivilegeCache.class);
+
+  public TreePrivilegeCache(Set<String> cachedPrivileges, PrivilegeFactory inPrivilegeFactory) {
+    if (cachedPrivileges == null) {
+      cachedPrivileges = new HashSet<String>();
+    }
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Created with privileges {}", cachedPrivileges);
+    }
+
+    this.cachedPrivileges = cachedPrivileges;
+    this.privilegeFactory = inPrivilegeFactory;
+    this.cachedPrivilegeMap = createPrivilegeMap(cachedPrivileges);
+  }
+
+  @Override
+  public Set<String> listPrivileges(Set<String> groups, ActiveRoleSet roleSet) {
+    return cachedPrivileges;
+  }
+
+  @Override
+  public Set<String> listPrivileges(Set<String> groups, Set<String> users, ActiveRoleSet roleSet) {
+    return cachedPrivileges;
+  }
+
+  @Override
+  public Set<String> listPrivileges(Set<String> groups, Set<String> users, ActiveRoleSet roleSet,
+    Authorizable... authorizationhierarchy) {
+    Set<Privilege> privilegeObjects = listPrivilegeObjects(groups, users, roleSet, authorizationhierarchy);
+
+    return privilegeObjects.stream()
+      .filter(priObj -> priObj != null)
+      .map(priObj -> priObj.toString())
+      .collect(Collectors.toSet());
+  }
+
+  @Override
+  public Set<Privilege> listPrivilegeObjects(Set<String> groups, Set<String> users,
+    ActiveRoleSet roleSet, Authorizable... authorizationhierarchy) {
+    Set<String> topResourceValues = getTopLevelResourceValues(authorizationhierarchy);
+    Set<Privilege> targetSet = new HashSet<>();
+    for (String topResourceValue : topResourceValues) {
+      if (StringUtils.isEmpty(topResourceValue)) {
+        continue;
+      }
+
+      TreePrivilegeNode topNode = cachedPrivilegeMap.get(topResourceValue);
+      if (topNode == null) {
+        continue;
+      }
+
+      targetSet.addAll(topNode.listPrivilegeObjects(0, authorizationhierarchy));
+    }
+
+    return targetSet;
+  }
+
+  @Override
+  public void close() {
+    // Keep the privileges to be consistent with cache implementation in Impala
+  }
+
+  private Privilege getPrivilegeObject(String priString) {
+    if (privilegeFactory != null) {
+      return privilegeFactory.createPrivilege(priString);
+    }
+
+    return new CommonPrivilege(priString);
+  }
+
+  private Map<String, TreePrivilegeNode> createPrivilegeMap(Set<String> cachedPrivileges) {
+    Map<String, TreePrivilegeNode> privilegeNodeMap = new HashMap<>();
+
+    for (String priString : cachedPrivileges) {
+      Privilege currPrivilege = getPrivilegeObject(priString);
+
+      String topKey = getTopLevelResourceValue(currPrivilege);
+      if (StringUtils.isEmpty(topKey)) {
+        LOGGER.warn("The top level authorizable of privilege {} is null", priString);
+        continue;
+      }
+
+      TreePrivilegeNode matchedNode = privilegeNodeMap.get(topKey);
+      if (matchedNode == null) {
+        matchedNode = new TreePrivilegeNode();
+        privilegeNodeMap.put(topKey, matchedNode);
+      }
+
+      matchedNode.addPrivilege(currPrivilege, 0);
+    }
+
+    return privilegeNodeMap;
+  }
+
+  private String getTopLevelResourceValue(Privilege inPrivilege) {
+    return TreePrivilegeNode.getResourceValue(0, inPrivilege);
+  }
+
+  private String getTopLevelResourceValue(Authorizable[] authorizables) {
+    return TreePrivilegeNode.getResourceValue(0, authorizables);
+  }
+
+  private Set<String> getTopLevelResourceValues(Authorizable[] authorizables) {
+    Set<String> keys = new HashSet<>();
+    keys.add(SentryConstants.RESOURCE_WILDCARD_VALUE.toLowerCase());
+    keys.add(SentryConstants.RESOURCE_WILDCARD_VALUE_SOME.toLowerCase());
+    keys.add(SentryConstants.RESOURCE_WILDCARD_VALUE_ALL.toLowerCase());
+
+    String topKey = getTopLevelResourceValue(authorizables);
+    if (!StringUtils.isEmpty(topKey)) {
+      keys.add(topKey);
+    }
+
+    return keys;
+  }
+}
diff --git a/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/TreePrivilegeNode.java b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/TreePrivilegeNode.java
new file mode 100644
index 0000000..4aa65ef
--- /dev/null
+++ b/sentry-provider/sentry-provider-cache/src/main/java/org/apache/sentry/provider/cache/TreePrivilegeNode.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.cache;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.utils.KeyValue;
+import org.apache.sentry.core.common.utils.SentryConstants;
+import org.apache.sentry.core.model.db.DBModelAuthorizable.AuthorizableType;
+import org.apache.sentry.policy.common.Privilege;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to store value of the <key, value> used in TreePrivilegeCache
+ */
+public class TreePrivilegeNode {
+  Set<Privilege> ownPrivileges;
+  Set<Privilege> childWildcardPrivileges;
+
+  /** the key of childPrivileges is the resource value of the next level hierarchy, i.e., child resource value
+   *  For example, a privilege "server=server1->db=db1->table=table2->column=col3->action=select"
+   *  will be put into the following data structure
+   *  TreePrivilegeCache.cachedPrivilegeMap[server1]
+   *    -> TreePrivilegeNode.childPrivileges[db1]        (partIndex = 0, for node of resource value: server1)
+   *      -> TreePrivilegeNode.childPrivileges[table2]   (partIndex = 1, for node of resource value: db1)
+   *        -> TreePrivilegeNode.childPrivileges[col3]   (partIndex = 2, for node of resource value: table2)
+   *          -> TreePrivilegeNode.ownPrivileges         (partIndex = 3, for node of resource value: col3)
+   *
+   *  A privilege "server=server1->db=db1->table=table2->column=*->action=select"
+   *  will be put into the following data structure
+   *  TreePrivilegeCache.cachedPrivilegeMap[server1]
+   *    -> TreePrivilegeNode.childPrivileges[db1]        (partIndex = 0, for node of resource value: server1)
+   *      -> TreePrivilegeNode.childPrivileges[table2]   (partIndex = 1, for node of resource value: db1)
+   *        -> TreePrivilegeNode.childWildcardPrivileges (partIndex = 2, for node of resource value: table2)
+   *
+   */
+  Map<String, TreePrivilegeNode> childPrivileges;
+
+  private static final Logger LOGGER = LoggerFactory
+    .getLogger(TreePrivilegeNode.class);
+
+  public TreePrivilegeNode () {
+  }
+
+  public void addPrivilege(final Privilege inPrivilege, int partIndex) {
+    if (isOwnPrivilege(inPrivilege, partIndex)) {
+      if (ownPrivileges == null) {
+        ownPrivileges = new HashSet<>();
+      }
+
+      ownPrivileges.add(inPrivilege);
+      return;
+    }
+
+    // find the child resource value, which is used as key in childPrivileges
+    String childResourceValue = getResourceValue(partIndex + 1, inPrivilege);
+    if (StringUtils.isEmpty(childResourceValue)) {
+      LOGGER.warn("Child resource value at index [{}] of privilege {} is null", partIndex, inPrivilege.toString());
+      return;
+    }
+
+    if (isResourceValueWildcard(childResourceValue)) {
+      if (childWildcardPrivileges == null) {
+        childWildcardPrivileges = new HashSet<>();
+      }
+
+      childWildcardPrivileges.add(inPrivilege);
+      return;
+    }
+
+    if (childPrivileges == null) {
+      childPrivileges = new HashMap<>();
+    }
+
+    TreePrivilegeNode childNode = childPrivileges.get(childResourceValue);
+    if (childNode == null) {
+      childNode = new TreePrivilegeNode();
+      childPrivileges.put(childResourceValue, childNode);
+    }
+
+    childNode.addPrivilege(inPrivilege, partIndex + 1);
+  }
+
+  /**
+   * Return the set of privileges that could match the authorizable, including own, child wild card, and
+   * matched child privileges.
+   * @param authorizationhierarchy list of authorizable in the order of server, db, table, column.
+   * @param partIndex the current index of the list of authorizable
+   * @return
+   */
+  public Set<Privilege> listPrivilegeObjects(int partIndex, Authorizable... authorizationhierarchy) {
+    if (authorizationhierarchy.length < partIndex + 1) {
+      return null;
+    }
+
+    Set<Privilege> targetSet = new HashSet<>();
+    if (ownPrivileges != null) {
+      targetSet.addAll(ownPrivileges);
+    }
+
+    if ((childWildcardPrivileges != null) && (authorizationhierarchy.length > partIndex + 1)) {
+      // only add when the child authorizable is included
+      targetSet.addAll(childWildcardPrivileges);
+    }
+
+    Set<Privilege> childPrivileges = listChildPrivilegeObjects(partIndex, authorizationhierarchy);
+
+    if (childPrivileges != null) {
+      targetSet.addAll(childPrivileges);
+    }
+
+    return targetSet;
+  }
+
+  // Check if there is child to process
+  // true: yes; false: reach to the end of the hierarchy, and there is no more child to process
+  private static boolean hasChild(int partIndex, int totalLevel) {
+    if (totalLevel <= partIndex + 1) {
+      return false;
+    }
+
+    return true;
+  }
+
+  // Check if there is own data to process
+  // true: yes; false: reach to the end of the hierarchy, and there is no more data to process
+  private static boolean hasOwn(int partIndex, int totalLevel) {
+    if (totalLevel < partIndex + 1) {
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Return the set of privileges that could match the authorizable, only including matched child privileges.
+   * @param partIndex
+   * @param authorizationhierarchy
+   * @return
+   */
+  private Set<Privilege> listChildPrivilegeObjects(int partIndex, Authorizable... authorizationhierarchy) {
+
+    if (!hasChild(partIndex, authorizationhierarchy.length)) {
+      return null;
+    }
+
+    String childKey = getResourceValue(partIndex + 1, authorizationhierarchy);
+    if (StringUtils.isEmpty(childKey)) {
+      return null;
+    }
+
+    if (isResourceValueWildcard(childKey)) {
+      // the authorizable for child is wildcard, so return the privileges of all children.
+      return listAllChildPrivilegeObjects(partIndex, authorizationhierarchy);
+    }
+
+    // the authorizable for child is for a specific child, return its own privileges.
+    if (childPrivileges == null) {
+      return null;
+    }
+    
+    TreePrivilegeNode childNode = childPrivileges.get(childKey);
+    if (childNode == null) {
+      return null;
+    }
+
+    return childNode.listPrivilegeObjects(partIndex + 1, authorizationhierarchy);
+  }
+
+  private Set<Privilege> listAllChildPrivilegeObjects(int partIndex, Authorizable... authorizationhierarchy) {
+    if (childPrivileges == null) {
+      return null;
+    }
+
+    Set<Privilege> targetPrivileges = new HashSet<>();
+
+    for (TreePrivilegeNode childNode : childPrivileges.values()) {
+      Set<Privilege> childSet = childNode.listPrivilegeObjects(partIndex + 1, authorizationhierarchy);
+      if ((childSet == null) || (childSet.size() == 0)) {
+        continue;
+      }
+
+      targetPrivileges.addAll(childSet);
+    }
+
+    return targetPrivileges;
+  }
+
+  private boolean isOwnPrivilege(Privilege inPrivilege, int partIndex) {
+    List<KeyValue> parts = inPrivilege.getParts();
+    if (!hasChild(partIndex, parts.size())) {
+      return true;
+    }
+
+    // check child resource type
+    String partType = parts.get(partIndex + 1).getKey();
+    if (SentryConstants.PRIVILEGE_NAME.equalsIgnoreCase(partType)) {
+      // the next part is action, not authorizable
+      return true;
+    }
+
+    if (AuthorizableType.URI.toString().equalsIgnoreCase(partType)) {
+      // the next part is uri
+      return true;
+    }
+
+    return false;
+  }
+
+  public static boolean isResourceValueWildcard(String resourceValue) {
+    if (StringUtils.isEmpty(resourceValue)) {
+      return false;
+    }
+
+    if (SentryConstants.RESOURCE_WILDCARD_VALUE.equalsIgnoreCase(resourceValue)) {
+      return true;
+    }
+
+    if (SentryConstants.RESOURCE_WILDCARD_VALUE_SOME.equalsIgnoreCase(resourceValue)) {
+      return true;
+    }
+
+    if (SentryConstants.RESOURCE_WILDCARD_VALUE_ALL.equalsIgnoreCase(resourceValue)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  public static String getResourceValue(int partIndex, Authorizable[] authorizables) {
+    if ((authorizables == null) || (authorizables.length == 0) ) {
+      return null;
+    }
+
+    if (!hasOwn(partIndex, authorizables.length)) {
+      return null;
+    }
+
+    Authorizable ownPart = authorizables[partIndex];
+
+    if (ownPart == null) {
+      return null;
+    }
+
+    return ownPart.getName().toLowerCase();
+  }
+
+  public static String getResourceValue(int partIndex, Privilege inPrivilege) {
+    List<KeyValue> parts = inPrivilege.getParts();
+
+    if (parts == null) {
+      return null;
+    }
+
+    if (!hasOwn(partIndex, parts.size())) {
+      return null;
+    }
+
+    KeyValue ownPart = parts.get(partIndex);
+
+    if (ownPart == null) {
+      return null;
+    }
+
+    return ownPart.getValue().toLowerCase();
+  }
+}
diff --git a/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/PrivilegeCacheTestImpl.java b/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/PrivilegeCacheTestImpl.java
index f2f735b..90587e0 100644
--- a/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/PrivilegeCacheTestImpl.java
+++ b/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/PrivilegeCacheTestImpl.java
@@ -17,15 +17,19 @@
 
 package org.apache.sentry.provider.cache;
 
+import com.google.common.collect.ImmutableSet;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Set;
 
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.policy.common.Privilege;
+import org.apache.sentry.policy.common.PrivilegeFactory;
 import org.apache.sentry.provider.common.ProviderBackendContext;
 import org.apache.sentry.core.common.utils.PolicyFiles;
 import org.apache.sentry.provider.file.SimpleFileProviderBackend;
@@ -35,11 +39,12 @@
 /**
  * Test cache provider that is a wrapper on top of File based provider
  */
-public class PrivilegeCacheTestImpl implements PrivilegeCache {
+public class PrivilegeCacheTestImpl implements FilteredPrivilegeCache {
   private static final String resourcePath = "test-authz-provider-local-group-mapping.ini";
 
   private SimpleFileProviderBackend backend;
   private File baseDir;
+  private PrivilegeFactory privilegeFactory;
 
   public PrivilegeCacheTestImpl() throws FileNotFoundException, IOException {
     baseDir = Files.createTempDir();
@@ -72,4 +77,19 @@
       Authorizable... authorizationhierarchy) {
     return backend.getPrivileges(groups, users, roleSet, authorizationhierarchy);
   }
+
+  @Override
+  public Set<Privilege> listPrivilegeObjects(Set<String> groups, Set<String> users, ActiveRoleSet roleSet,
+      Authorizable... authorizationHierarchy) {
+    if (privilegeFactory == null) {
+      return ImmutableSet.of();
+    }
+
+    Set<String> privilegeStrings = listPrivileges(groups, users, roleSet, authorizationHierarchy);
+
+    return privilegeStrings.stream()
+        .filter(priString -> priString != null)
+        .map(priString -> privilegeFactory.createPrivilege(priString))
+        .collect(Collectors.toSet());
+  }
 }
diff --git a/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/TestSimpleFilteredPrivilegeCache.java b/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/TestSimpleFilteredPrivilegeCache.java
new file mode 100644
index 0000000..4615bd4
--- /dev/null
+++ b/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/TestSimpleFilteredPrivilegeCache.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.cache;
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.utils.KeyValue;
+import org.apache.sentry.core.common.utils.SentryConstants;
+import org.apache.sentry.core.model.db.Column;
+import org.apache.sentry.core.model.db.Database;
+import org.apache.sentry.core.model.db.Table;
+import org.apache.sentry.policy.common.CommonPrivilege;
+import org.apache.sentry.policy.common.Privilege;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.sentry.core.model.db.Server;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSimpleFilteredPrivilegeCache {
+
+  @Test
+  public void testListPrivilegesCaseSensitivity() {
+    CommonPrivilege dbSelect = create(new KeyValue("Server", "Server1"),
+    new KeyValue("db", "db1"), new KeyValue("action", "SELECT"));
+
+    SimpleFilteredPrivilegeCache cache = new SimpleFilteredPrivilegeCache(Sets.newHashSet(dbSelect.toString()), null);
+    assertEquals(1, cache.listPrivileges(null, null, null,
+        new Server("server1"), new Database("db1")).size());
+  }
+
+  @Test
+  public void testListPrivilegesWildCard() {
+    CommonPrivilege t1D1Select = create(new KeyValue("Server", "server1"),
+        new KeyValue("db", "db1"), new KeyValue("table", "t1"), new KeyValue("action", "SELECT"));
+    CommonPrivilege t1D2Select = create(new KeyValue("Server", "server1"),
+        new KeyValue("db", "db2"), new KeyValue("table", "t1"), new KeyValue("action", "SELECT"));
+    CommonPrivilege t2Select = create(new KeyValue("Server", "server1"),
+        new KeyValue("db", "db1"), new KeyValue("table", "t2"), new KeyValue("action", "SELECT"));
+    CommonPrivilege wildCardTable = create(new KeyValue("Server", "server1"),
+        new KeyValue("db", "db1"), new KeyValue("table", "*"), new KeyValue("action", "SELECT"));
+    CommonPrivilege allTable = create(new KeyValue("Server", "server1"),
+        new KeyValue("db", "db1"), new KeyValue("table", "ALL"), new KeyValue("action", "SELECT"));
+    CommonPrivilege allDatabase = create(new KeyValue("Server", "server1"),
+        new KeyValue("db", "*"));
+    CommonPrivilege colSelect = create(new KeyValue("Server", "server1"),
+        new KeyValue("db", "db1"), new KeyValue("table", "t1"), new KeyValue("column", "c1"), new KeyValue("action", "SELECT"));
+
+    SimpleFilteredPrivilegeCache cache = new SimpleFilteredPrivilegeCache(Sets.newHashSet(t1D1Select.toString(),
+        t1D2Select.toString(), t2Select.toString(), wildCardTable.toString(), allTable.toString(),
+        allDatabase.toString(), colSelect.toString()), null);
+
+    assertEquals(0, cache.listPrivileges(null, null, null, new Server("server1")).size());
+    assertEquals(1, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1")).size());
+    assertEquals(1, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db2")).size());
+    assertEquals(4, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1"), new Table("t1")).size());
+    assertEquals(5, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1"), new Table("*")).size());
+    assertEquals(5, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1"), new Table("t1"), new Column("*")).size());
+    assertEquals(6, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1"), new Table("*"), new Column("*")).size());
+    assertEquals(2, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db2"), new Table("t1"), new Column("*")).size());
+  }
+
+  @Test
+  public void testListPrivilegeObjectsWildCard() {
+    CommonPrivilege t1D1Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "t1"), new KeyValue("action", "SELECT"));
+    CommonPrivilege t1D2Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db2"), new KeyValue("table", "t1"), new KeyValue("action", "SELECT"));
+    CommonPrivilege t2Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "t2"), new KeyValue("action", "SELECT"));
+    CommonPrivilege wildCardTable = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "*"), new KeyValue("action", "SELECT"));
+    CommonPrivilege allTable = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "ALL"), new KeyValue("action", "SELECT"));
+    CommonPrivilege allDatabase = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "*"));
+    CommonPrivilege colSelect = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "t1"), new KeyValue("column", "c1"), new KeyValue("action", "SELECT"));
+
+    SimpleFilteredPrivilegeCache cache = new SimpleFilteredPrivilegeCache(Sets.newHashSet(t1D1Select.toString(),
+      t1D2Select.toString(), t2Select.toString(), wildCardTable.toString(), allTable.toString(),
+      allDatabase.toString(), colSelect.toString()), null);
+
+    assertEquals(0, cache.listPrivilegeObjects(null, null, null, new Server("server1")).size());
+    assertEquals(1, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db1")).size());
+    assertEquals(1, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db2")).size());
+    assertEquals(4, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db1"), new Table("t1")).size());
+    assertEquals(5, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db1"), new Table("*")).size());
+    assertEquals(5, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db1"), new Table("t1"), new Column("*")).size());
+    assertEquals(6, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db1"), new Table("*"), new Column("*")).size());
+    assertEquals(2, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db2"), new Table("t1"), new Column("*")).size());
+  }
+
+  @Test
+  public void testListPrivilegesURI() {
+    CommonPrivilege uri1Select = create(new KeyValue("Server", "server1"),
+        new KeyValue("uri", "hdfs:///uri/path1"));
+    CommonPrivilege uri2Select = create(new KeyValue("Server", "server1"),
+        new KeyValue("uri", "hdfs:///uri/path2"));
+
+    SimpleFilteredPrivilegeCache cache = new SimpleFilteredPrivilegeCache(Sets.newHashSet(uri1Select.toString(),
+      uri2Select.toString()), null);
+
+    assertEquals(2, cache.listPrivileges(null, null, null, new Server("server1")).size());
+  }
+
+  @Test
+  @Ignore("This test should be run manually.")
+  public void testListPrivilegesPerf() {
+
+    Set<String> privileges = generatePrivilegeStrings(1000, 10);
+    SimpleFilteredPrivilegeCache cache = new SimpleFilteredPrivilegeCache(privileges, null);
+    List<Authorizable[]> authorizables = generateAuthoriables(12000, 10);
+
+    long start = System.currentTimeMillis();
+    for (Authorizable[] authorizableHierarchy : authorizables) {
+      Set<String> priStrings = cache.listPrivileges(null, null, null, authorizableHierarchy);
+      for (String priString : priStrings) {
+        CommonPrivilege currPri = create(priString);
+        currPri.getParts();
+      }
+    }
+    long end = System.currentTimeMillis();
+
+    System.out.println("SimplePrivilegeCache - total time on list string: " + (end - start) + " ms");
+  }
+
+  @Test
+  @Ignore("This test should be run manually.")
+  public void testListPrivilegeObjectsPerf() {
+
+    Set<String> privileges = generatePrivilegeStrings(1000, 10);
+    SimpleFilteredPrivilegeCache cache = new SimpleFilteredPrivilegeCache(privileges, null);
+    List<Authorizable[]> authorizables = generateAuthoriables(12000, 10);
+
+    long start = System.currentTimeMillis();
+    for (Authorizable[] authorizableHierarchy : authorizables) {
+      Set<Privilege> privilegeSet = cache.listPrivilegeObjects(null, null, null, authorizableHierarchy);
+      for (Privilege currPri : privilegeSet) {
+        currPri.getParts();
+      }
+    }
+    long end = System.currentTimeMillis();
+
+    System.out.println("SimplePrivilegeCache - total time on list obj: " + (end - start) + " ms");
+  }
+
+  Set<String> generatePrivilegeStrings(int dbCount, int tableCount) {
+    Set<String> priStrings = new HashSet<>();
+    for (int i = 0; i < dbCount; i ++) {
+      for (int j = 0; j < tableCount; j ++) {
+        String priString = "Server=server1->Database=db" + i + "->Table=table" + j;
+        priStrings.add(priString);
+      }
+    }
+
+    return priStrings;
+  }
+
+  List<Authorizable[]> generateAuthoriables(int dbCount, int tableCount) {
+    List<Authorizable[]> authorizables = new ArrayList<>();
+
+    for (int i = 0; i < dbCount; i ++) {
+      for (int j = 0; j < tableCount; j ++) {
+        Authorizable[] authorizable = new Authorizable[3];
+        authorizable[0] = new Server("server1");
+        authorizable[1] = new Database("db" + i);
+        authorizable[2] = new Table("table" + j);
+
+        authorizables.add(authorizable);
+      }
+    }
+
+    return authorizables;
+  }
+
+
+  static CommonPrivilege create(KeyValue... keyValues) {
+    return create(SentryConstants.AUTHORIZABLE_JOINER.join(keyValues));
+  }
+
+  static CommonPrivilege create(String s) {
+    return new CommonPrivilege(s);
+  }
+}
diff --git a/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/TestSimplePrivilegeCache.java b/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/TestSimplePrivilegeCache.java
deleted file mode 100644
index 891c1d9..0000000
--- a/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/TestSimplePrivilegeCache.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.provider.cache;
-
-import com.google.common.collect.Sets;
-import org.apache.sentry.core.common.utils.KeyValue;
-import org.apache.sentry.core.common.utils.SentryConstants;
-import org.apache.sentry.core.model.db.Database;
-import org.apache.sentry.core.model.db.Table;
-import org.apache.sentry.policy.common.CommonPrivilege;
-import org.junit.Test;
-
-import org.apache.sentry.core.model.db.Server;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestSimplePrivilegeCache {
-
-  @Test
-     public void testListPrivilegesCaseSensitivity() {
-    CommonPrivilege dbSelect = create(new KeyValue("Server", "Server1"),
-    new KeyValue("db", "db1"), new KeyValue("action", "SELECT"));
-
-    SimplePrivilegeCache cache = new SimplePrivilegeCache(Sets.newHashSet(dbSelect.toString()));
-    assertEquals(1, cache.listPrivileges(null, null, null,
-        new Server("server1"), new Database("db1")).size());
-  }
-
-  @Test
-  public void testListPrivilegesWildCard() {
-    CommonPrivilege t1D1Select = create(new KeyValue("Server", "server1"),
-        new KeyValue("db", "db1"), new KeyValue("table", "t1"), new KeyValue("action", "SELECT"));
-    CommonPrivilege t1D2Select = create(new KeyValue("Server", "server1"),
-        new KeyValue("db", "db2"), new KeyValue("table", "t1"), new KeyValue("action", "SELECT"));
-    CommonPrivilege t2Select = create(new KeyValue("Server", "server1"),
-        new KeyValue("db", "db1"), new KeyValue("table", "t2"), new KeyValue("action", "SELECT"));
-    CommonPrivilege wildCardTable = create(new KeyValue("Server", "server1"),
-        new KeyValue("db", "db1"), new KeyValue("table", "*"), new KeyValue("action", "SELECT"));
-    CommonPrivilege allTable = create(new KeyValue("Server", "server1"),
-        new KeyValue("db", "db1"), new KeyValue("table", "ALL"), new KeyValue("action", "SELECT"));
-    CommonPrivilege allDatabase = create(new KeyValue("Server", "server1"),
-        new KeyValue("db", "*"));
-    CommonPrivilege colSelect = create(new KeyValue("Server", "server1"),
-        new KeyValue("db", "db1"), new KeyValue("table", "t1"), new KeyValue("col", "c1"), new KeyValue("action", "SELECT"));
-
-    SimplePrivilegeCache cache = new SimplePrivilegeCache(Sets.newHashSet(t1D1Select.toString(),
-        t1D2Select.toString(), t2Select.toString(), wildCardTable.toString(), allTable.toString(),
-        allDatabase.toString(), colSelect.toString()));
-
-    assertEquals(0, cache.listPrivileges(null, null, null, new Server("server1")).size());
-    assertEquals(1, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1")).size());
-    assertEquals(1, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db2")).size());
-    assertEquals(4, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1"), new Table("t1")).size());
-  }
-
-  @Test
-  public void testListPrivilegesURI() {
-    CommonPrivilege uri1Select = create(new KeyValue("Server", "server1"),
-        new KeyValue("uri", "hdfs:///uri/path1"));
-    CommonPrivilege uri2Select = create(new KeyValue("Server", "server1"),
-        new KeyValue("uri", "hdfs:///uri/path2"));
-
-    SimplePrivilegeCache cache = new SimplePrivilegeCache(Sets.newHashSet(uri1Select.toString(),
-      uri2Select.toString()));
-
-    assertEquals(2, cache.listPrivileges(null, null, null, new Server("server1")).size());
-  }
-
-  static CommonPrivilege create(KeyValue... keyValues) {
-    return create(SentryConstants.AUTHORIZABLE_JOINER.join(keyValues));
-  }
-
-  static CommonPrivilege create(String s) {
-    return new CommonPrivilege(s);
-  }
-}
diff --git a/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/TestTreePrivilegeCache.java b/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/TestTreePrivilegeCache.java
new file mode 100644
index 0000000..de142ce
--- /dev/null
+++ b/sentry-provider/sentry-provider-cache/src/test/java/org/apache/sentry/provider/cache/TestTreePrivilegeCache.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.cache;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.utils.KeyValue;
+import org.apache.sentry.core.common.utils.SentryConstants;
+import org.apache.sentry.core.model.db.Column;
+import org.apache.sentry.core.model.db.Database;
+import org.apache.sentry.core.model.db.Server;
+import org.apache.sentry.core.model.db.Table;
+import org.apache.sentry.policy.common.CommonPrivilege;
+import org.apache.sentry.policy.common.Privilege;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestTreePrivilegeCache {
+  @Test
+  public void testListPrivilegesCaseSensitivity() {
+    CommonPrivilege dbSelect = create(new KeyValue("Server", "Server1"),
+      new KeyValue("db", "db1"), new KeyValue("action", "SELECT"));
+
+    TreePrivilegeCache cache = new TreePrivilegeCache(Sets.newHashSet(dbSelect.toString()), null);
+    assertEquals(1, cache.listPrivileges(null, null, null,
+      new Server("server1"), new Database("db1")).size());
+  }
+
+  @Test
+  public void testListPrivilegesWildCard() {
+    CommonPrivilege t1D1Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "t1"), new KeyValue("action", "SELECT"));
+    CommonPrivilege t1D2Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db2"), new KeyValue("table", "t1"), new KeyValue("action", "SELECT"));
+    CommonPrivilege t2Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "t2"), new KeyValue("action", "SELECT"));
+    CommonPrivilege wildCardTable = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "*"), new KeyValue("action", "SELECT"));
+    CommonPrivilege allTable = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "ALL"), new KeyValue("action", "SELECT"));
+    CommonPrivilege allDatabase = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "*"));
+    CommonPrivilege colSelect = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "t1"), new KeyValue("column", "c1"), new KeyValue("action", "SELECT"));
+
+    TreePrivilegeCache cache = new TreePrivilegeCache(Sets.newHashSet(t1D1Select.toString(),
+      t1D2Select.toString(), t2Select.toString(), wildCardTable.toString(), allTable.toString(),
+      allDatabase.toString(), colSelect.toString()), null);
+
+    assertEquals(0, cache.listPrivileges(null, null, null, new Server("server1")).size());
+    assertEquals(1, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1")).size());
+    assertEquals(1, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db2")).size());
+    assertEquals(4, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1"), new Table("t1")).size());
+    assertEquals(5, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1"), new Table("*")).size());
+    assertEquals(5, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1"), new Table("t1"), new Column("*")).size());
+    assertEquals(6, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db1"), new Table("*"), new Column("*")).size());
+    assertEquals(2, cache.listPrivileges(null, null, null, new Server("server1"), new Database("db2"), new Table("t1"), new Column("*")).size());
+  }
+
+  @Test
+  public void testListPrivilegeObjectsWildCard() {
+    CommonPrivilege t1D1Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "t1"), new KeyValue("action", "SELECT"));
+    CommonPrivilege t1D2Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db2"), new KeyValue("table", "t1"), new KeyValue("action", "SELECT"));
+    CommonPrivilege t2Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "t2"), new KeyValue("action", "SELECT"));
+    CommonPrivilege wildCardTable = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "*"), new KeyValue("action", "SELECT"));
+    CommonPrivilege allTable = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "ALL"), new KeyValue("action", "SELECT"));
+    CommonPrivilege allDatabase = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "*"));
+    CommonPrivilege colSelect = create(new KeyValue("Server", "server1"),
+      new KeyValue("db", "db1"), new KeyValue("table", "t1"), new KeyValue("column", "c1"), new KeyValue("action", "SELECT"));
+
+    TreePrivilegeCache cache = new TreePrivilegeCache(Sets.newHashSet(t1D1Select.toString(),
+      t1D2Select.toString(), t2Select.toString(), wildCardTable.toString(), allTable.toString(),
+      allDatabase.toString(), colSelect.toString()), null);
+
+    assertEquals(0, cache.listPrivilegeObjects(null, null, null, new Server("server1")).size());
+    assertEquals(1, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db1")).size());
+    assertEquals(1, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db2")).size());
+    assertEquals(4, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db1"), new Table("t1")).size());
+    assertEquals(5, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db1"), new Table("*")).size());
+    assertEquals(5, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db1"), new Table("t1"), new Column("*")).size());
+    assertEquals(6, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db1"), new Table("*"), new Column("*")).size());
+    assertEquals(2, cache.listPrivilegeObjects(null, null, null, new Server("server1"), new Database("db2"), new Table("t1"), new Column("*")).size());
+  }
+
+  @Test
+  public void testListPrivilegesURI() {
+    CommonPrivilege uri1Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("uri", "hdfs:///uri/path1"));
+    CommonPrivilege uri2Select = create(new KeyValue("Server", "server1"),
+      new KeyValue("uri", "hdfs:///uri/path2"));
+
+    TreePrivilegeCache cache = new TreePrivilegeCache(Sets.newHashSet(uri1Select.toString(),
+      uri2Select.toString()), null);
+
+    assertEquals(2, cache.listPrivileges(null, null, null, new Server("server1")).size());
+  }
+
+  @Test
+  @Ignore("This test should be run manually.")
+  public void testListPrivilegesPerf() {
+
+    Set<String> privileges = generatePrivilegeStrings(1000, 10);
+    TreePrivilegeCache cache = new TreePrivilegeCache(privileges, null);
+    List<Authorizable[]> authorizables = generateAuthoriables(12000, 10);
+
+    long start = System.currentTimeMillis();
+    for (Authorizable[] authorizableHierarchy : authorizables) {
+      Set<String> priStrings = cache.listPrivileges(null, null, null, authorizableHierarchy);
+      for (String priString : priStrings) {
+        CommonPrivilege currPri = create(priString);
+        currPri.getParts();
+      }
+    }
+    long end = System.currentTimeMillis();
+
+    System.out.println("TreePrivilegeCache - total time on list string: " + (end - start) + " ms");
+  }
+
+  @Test
+  @Ignore("This test should be run manually.")
+  public void testListPrivilegeObjectsPerf() {
+
+    Set<String> privileges = generatePrivilegeStrings(1000, 10);
+    TreePrivilegeCache cache = new TreePrivilegeCache(privileges, null);
+    List<Authorizable[]> authorizables = generateAuthoriables(12000, 10);
+
+    long start = System.currentTimeMillis();
+    for (Authorizable[] authorizableHierarchy : authorizables) {
+      Set<Privilege> privilegeSet = cache.listPrivilegeObjects(null, null, null, authorizableHierarchy);
+      for (Privilege currPri : privilegeSet) {
+        currPri.getParts();
+      }
+    }
+    long end = System.currentTimeMillis();
+
+    System.out.println("TreePrivilegeCache - total time on list obj: " + (end - start) + " ms");
+  }
+
+  Set<String> generatePrivilegeStrings(int dbCount, int tableCount) {
+    Set<String> priStrings = new HashSet<>();
+    for (int i = 0; i < dbCount; i ++) {
+      for (int j = 0; j < tableCount; j ++) {
+        String priString = "Server=server1->Database=db" + i + "->Table=table" + j;
+        priStrings.add(priString);
+      }
+    }
+
+    return priStrings;
+  }
+
+  List<Authorizable[]> generateAuthoriables(int dbCount, int tableCount) {
+    List<Authorizable[]> authorizables = new ArrayList<>();
+
+    for (int i = 0; i < dbCount; i ++) {
+      for (int j = 0; j < tableCount; j ++) {
+        Authorizable[] authorizable = new Authorizable[3];
+        authorizable[0] = new Server("server1");
+        authorizable[1] = new Database("db" + i);
+        authorizable[2] = new Table("table" + j);
+
+        authorizables.add(authorizable);
+      }
+    }
+
+    return authorizables;
+  }
+
+  static CommonPrivilege create(KeyValue... keyValues) {
+    return create(SentryConstants.AUTHORIZABLE_JOINER.join(keyValues));
+  }
+
+  static CommonPrivilege create(String s) {
+    return new CommonPrivilege(s);
+  }
+
+
+}
diff --git a/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/CacheProvider.java b/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/CacheProvider.java
index d50a0bc..19a0f09 100644
--- a/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/CacheProvider.java
+++ b/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/CacheProvider.java
@@ -18,6 +18,8 @@
 
 import java.util.Map;
 import java.util.Set;
+import org.apache.sentry.policy.common.CommonPrivilege;
+import org.apache.sentry.policy.common.Privilege;
 
 public class CacheProvider {
   private TableCache cache;
@@ -48,6 +50,27 @@
     return resultBuilder.build();
   }
 
+  public ImmutableSet<Privilege> getPrivilegeObjects(Set<String> groups, Set<String> users,
+      ActiveRoleSet roleSet, Authorizable... authorizableHierarchy) {
+
+    if (!initialized) {
+      throw new IllegalStateException("CacheProvider has not been properly initialized");
+    }
+    ImmutableSet.Builder<Privilege> resultBuilder = ImmutableSet.builder();
+    for (String groupName : groups) {
+      for (Map.Entry<String, Set<String>> row : cache.getCache().row(groupName).entrySet()) {
+        if (roleSet.containsRole(row.getKey())) {
+          // TODO: SENTRY-1245: Filter by Authorizables, if provided
+          Set<String> privilegeStrings = row.getValue();
+          for (String privilegeString : privilegeStrings) {
+            resultBuilder.add(getPrivilegeObject(privilegeString));
+          }
+        }
+      }
+    }
+    return resultBuilder.build();
+  }
+
   public ImmutableSet<String> getRoles(Set<String> groups, ActiveRoleSet roleSet) {
     if (!initialized) {
       throw new IllegalStateException("CacheProvider has not been properly initialized");
@@ -65,4 +88,8 @@
     }
     return resultBuilder.build();
   }
+
+  private Privilege getPrivilegeObject(String priString) {
+    return new CommonPrivilege(priString);
+  }
 }
diff --git a/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/ProviderBackend.java b/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/ProviderBackend.java
index b244dba..12b9785 100644
--- a/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/ProviderBackend.java
+++ b/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/ProviderBackend.java
@@ -26,6 +26,7 @@
 import org.apache.sentry.core.common.exception.SentryConfigurationException;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.sentry.policy.common.Privilege;
 
 /**
  * Interface for getting roles from a specific provider backend. Implementations
@@ -52,12 +53,20 @@
   ImmutableSet<String> getPrivileges(Set<String> groups, ActiveRoleSet roleSet, Authorizable... authorizableHierarchy);
 
   /**
-   * Get the privileges from the backend for users and groups.
+   * Get the privileges in string from the backend for users and groups.
    */
   ImmutableSet<String> getPrivileges(Set<String> groups, Set<String> users,
       ActiveRoleSet roleSet, Authorizable... authorizableHierarchy);
 
   /**
+   * Get the privilege objects from the backend for users, groups and authorizables.
+   * If the returned result is empty set, the caller should call getPrivileges() in case its
+   * cache does not support this function
+   */
+  ImmutableSet<Privilege> getPrivilegeObjects(Set<String> groups, Set<String> users,
+      ActiveRoleSet roleSet, Authorizable... authorizableHierarchy);
+
+  /**
    * Get the roles associated with the groups from the backend.
    */
   ImmutableSet<String> getRoles(Set<String> groups, ActiveRoleSet roleSet);
diff --git a/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/ResourceAuthorizationProvider.java b/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/ResourceAuthorizationProvider.java
index 222b77a..33bfc88 100644
--- a/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/ResourceAuthorizationProvider.java
+++ b/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/ResourceAuthorizationProvider.java
@@ -160,6 +160,12 @@
 
   private Iterable<Privilege> getPrivileges(Set<String> groups, Set<String> users,
       ActiveRoleSet roleSet, Authorizable[] authorizables) {
+    ImmutableSet<Privilege> privilegeObjects = policy.getPrivilegeObjects(groups, users, roleSet, authorizables);
+
+    if (privilegeObjects != null && privilegeObjects.size() > 0) {
+      return appendDefaultDBPrivObject(privilegeObjects, authorizables);
+    }
+
     ImmutableSet<String> privileges = policy.getPrivileges(groups, users, roleSet, authorizables);
     return Iterables.transform(appendDefaultDBPriv(privileges, authorizables),
         new Function<String, Privilege>() {
@@ -193,6 +199,28 @@
     return false;
   }
 
+  private ImmutableSet<Privilege> appendDefaultDBPrivObject(ImmutableSet<Privilege> privileges, Authorizable[] authorizables) {
+    // Only for switch db
+    if (authorizables != null && authorizables.length == 4 && authorizables[2].getName().equals("+")
+        && privileges.size() == 1 && hasOnlyServerPrivilege(privileges.asList().get(0))) {
+      // Assuming authorizable[0] will always be the server
+      // This Code is only reachable when user fires a 'use default'
+      // and the user has a privilege on atleast 1 privilized Object
+      String defaultPrivString = "Server=" + authorizables[0].getName()
+          + "->Db=default->Table=*->Column=*->action=select";
+      Privilege defaultPriv = privilegeFactory.createPrivilege(defaultPrivString);
+      return ImmutableSet.of(defaultPriv);
+    }
+    return privileges;
+  }
+
+  private boolean hasOnlyServerPrivilege(Privilege privObj) {
+    if(privObj.getParts().size() == 1 && privObj.getParts().get(0).getKey().equalsIgnoreCase("server")) {
+      return privObj.getParts().get(0).getValue().endsWith("+");
+    }
+    return false;
+  }
+
   @Override
   public GroupMappingService getGroupMapping() {
     return groupService;
diff --git a/sentry-provider/sentry-provider-common/src/test/java/org/apache/sentry/provider/common/TestGetGroupMapping.java b/sentry-provider/sentry-provider-common/src/test/java/org/apache/sentry/provider/common/TestGetGroupMapping.java
index ccc505f..cf7e1b1 100644
--- a/sentry-provider/sentry-provider-common/src/test/java/org/apache/sentry/provider/common/TestGetGroupMapping.java
+++ b/sentry-provider/sentry-provider-common/src/test/java/org/apache/sentry/provider/common/TestGetGroupMapping.java
@@ -24,6 +24,7 @@
 import org.apache.sentry.core.common.Authorizable;
 import org.apache.sentry.core.common.exception.SentryConfigurationException;
 import org.apache.sentry.policy.common.PolicyEngine;
+import org.apache.sentry.policy.common.Privilege;
 import org.apache.sentry.policy.common.PrivilegeFactory;
 import org.junit.Test;
 
@@ -62,6 +63,13 @@
       }
 
       @Override
+      public ImmutableSet<Privilege> getPrivilegeObjects(Set<String> groups, Set<String> users,
+          ActiveRoleSet roleSet, Authorizable... authorizableHierarchy)
+          throws SentryConfigurationException {
+        return ImmutableSet.of();
+      }
+
+      @Override
       public void validatePolicy(boolean strictValidation)
           throws SentryConfigurationException {
       }
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
index 277f6b3..88edbe4 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
@@ -22,6 +22,8 @@
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
 import org.apache.sentry.core.common.exception.SentryConfigurationException;
+import org.apache.sentry.policy.common.CommonPrivilege;
+import org.apache.sentry.policy.common.Privilege;
 import org.apache.sentry.provider.common.ProviderBackend;
 import org.apache.sentry.provider.common.ProviderBackendContext;
 import org.apache.sentry.api.common.ApiConstants;
@@ -106,6 +108,22 @@
    * {@inheritDoc}
    */
   @Override
+  public ImmutableSet<Privilege> getPrivilegeObjects(Set<String> groups, Set<String> users,
+      ActiveRoleSet roleSet, Authorizable... authorizableHierarchy) {
+    ImmutableSet<String> privilegeStrings = getPrivileges(groups, users, roleSet, authorizableHierarchy);
+
+    ImmutableSet.Builder<Privilege> resultBuilder = ImmutableSet.builder();
+    for (String privilegeString : privilegeStrings) {
+      resultBuilder.add(getPrivilegeObject(privilegeString));
+    }
+
+    return resultBuilder.build();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public ImmutableSet<String> getRoles(Set<String> groups, ActiveRoleSet roleSet) {
     throw new UnsupportedOperationException("Not yet implemented.");
   }
@@ -122,5 +140,9 @@
   public void validatePolicy(boolean strictValidation) throws SentryConfigurationException {
   //Noop
   }
+
+  private Privilege getPrivilegeObject(String priString) {
+    return new CommonPrivilege(priString);
+  }
 }
 
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
index f8dc211..25c653f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
@@ -29,6 +29,7 @@
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
 import org.apache.sentry.core.common.exception.SentryConfigurationException;
+import org.apache.sentry.policy.common.Privilege;
 import org.apache.sentry.provider.common.CacheProvider;
 import org.apache.sentry.provider.common.ProviderBackend;
 import org.apache.sentry.provider.common.ProviderBackendContext;
@@ -133,6 +134,20 @@
   }
 
   @Override
+  public ImmutableSet<Privilege> getPrivilegeObjects(Set<String> groups, Set<String> users,
+    ActiveRoleSet roleSet, Authorizable... authorizableHierarchy) {
+    if (!initialized) {
+      throw new IllegalStateException("SentryGenericProviderBackend has not been properly initialized");
+    }
+    if (enableCaching) {
+      return super.getPrivilegeObjects(groups, users, roleSet, authorizableHierarchy);
+    }
+
+    // let caller call getPrivileges() then convert the privilege from string to object
+    return ImmutableSet.of();
+  }
+
+  @Override
   public ImmutableSet<String> getRoles(Set<String> groups, ActiveRoleSet roleSet) {
     if (!initialized) {
       throw new IllegalStateException("SentryGenericProviderBackend has not been properly initialized");
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestColumnEndToEnd.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestColumnEndToEnd.java
index 3881692..15cbc5a 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestColumnEndToEnd.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestColumnEndToEnd.java
@@ -436,6 +436,47 @@
     statement.execute("load data local inpath '" + dataFile.getPath() + "' into table db_1.tb1" );
     statement.execute("use " + DB2);
     statement.execute("CREATE TABLE tb2 (id int, num String)");
+    statement.execute("GRANT SELECT (num) ON TABLE tb2 TO ROLE user_role1");
+    statement.execute("GRANT SELECT  ON TABLE tb2 TO ROLE user_role2");
+    statement.execute("GRANT ROLE user_role1 TO GROUP " + USERGROUP1);
+    statement.execute("GRANT ROLE user_role2 TO GROUP " + USERGROUP2);
+    statement.execute("load data local inpath '" + dataFile.getPath() + "' into table db_2.tb2" );
+    statement.close();
+    connection.close();
+
+    connection =context.createConnection(USER1_1);
+    statement = context.createStatement(connection);
+    statement.execute("use " + DB1);
+    statement.execute("CREATE table db_1.t1 as select tb2.num from db_2.tb2");
+
+    // make sure the async processing is done before test clean up. Otherwise, the test may fail
+    Thread.sleep(1000);
+
+    statement.close();
+    connection.close();
+  }
+
+  @Test
+  public void testCrossDbTableOperations2() throws Exception {
+    //The privilege of user_role1 is used to test create table as select.
+    //The privilege of user_role2 is used to test create view as select.
+    Connection connection = context.createConnection(ADMIN1);
+    Statement statement = context.createStatement(connection);
+    statement.execute("CREATE database " + DB1);
+    statement.execute("CREATE database " + DB2);
+    statement.execute("use " + DB1);
+    statement.execute("CREATE ROLE user_role1");
+    statement.execute("CREATE ROLE user_role2");
+    statement.execute("CREATE TABLE tb1 (id int , name String)");
+    statement.execute("GRANT CREATE ON DATABASE db_1 TO ROLE user_role1");
+    statement.execute("GRANT CREATE ON DATABASE db_1 TO ROLE user_role2");
+    statement.execute("GRANT SELECT (id) ON TABLE tb1 TO ROLE user_role1");
+    statement.execute("GRANT SELECT  ON TABLE tb1 TO ROLE user_role2");
+    statement.execute("GRANT ROLE user_role1 TO GROUP " + USERGROUP1);
+    statement.execute("GRANT ROLE user_role2 TO GROUP " + USERGROUP2);
+    statement.execute("load data local inpath '" + dataFile.getPath() + "' into table db_1.tb1" );
+    statement.execute("use " + DB2);
+    statement.execute("CREATE TABLE tb2 (id int, num String)");
     statement.execute("CREATE TABLE tb3 (id int, val String)");
     statement.execute("GRANT SELECT (num) ON TABLE tb2 TO ROLE user_role1");
     statement.execute("GRANT SELECT (val) ON TABLE tb3 TO ROLE user_role1");
@@ -448,13 +489,6 @@
     statement.close();
     connection.close();
 
-    connection =context.createConnection(USER1_1);
-    statement = context.createStatement(connection);
-    statement.execute("use " + DB1);
-    statement.execute("CREATE table db_1.t1 as select tb1.id, tb2.num from db_1.tb1,db_2.tb2");
-    statement.close();
-    connection.close();
-
     connection =context.createConnection(USER2_1);
     statement = context.createStatement(connection);
     //The db_1.tb1 and db_2.tb3 is same with db_2.tb2.
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java
index 4c09e68..eac2fca 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java
@@ -177,7 +177,7 @@
               ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS_DEFAULT * 2 + CACHE_REFRESH * 2;
 
   protected static long HMSFOLLOWER_INTERVAL_MILLS = 50;
-  protected static long WAIT_FOR_NOTIFICATION_PROCESSING = HMSFOLLOWER_INTERVAL_MILLS * 3;
+  protected static long WAIT_FOR_NOTIFICATION_PROCESSING = HMSFOLLOWER_INTERVAL_MILLS * 5;
 
   // Time to wait before running next tests. The unit is milliseconds.
   // Deleting HDFS may finish, but HDFS may not be ready for creating the same file again.
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
index 9045773..86e7d14 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
@@ -141,6 +141,7 @@
   protected static boolean enableAuthorizingObjectStore = true;
   protected static boolean enableAuthorizeReadMetaData = false;
   protected static boolean enableFilter = false;
+  protected static int hmsFollowerIntervalInMilliseconds = 10000;
   // indicate if the database need to be clear for every test case in one test class
   protected static boolean clearDbPerTest = true;
   protected static boolean showDbOnSelectOnly = false;
@@ -509,6 +510,10 @@
   private static void setupSentryService() throws Exception {
 
     sentryConf = new Configuration(true);
+    // HMS is not started in this class, and tests based on this class does not use HMS
+    // set the interval that HMS client contacts HMS to reduce connection exception in log
+    properties.put(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS, String.valueOf(hmsFollowerIntervalInMilliseconds));
+
     properties.put(HiveServerFactory.AUTHZ_PROVIDER_BACKEND,
         SimpleDBProviderBackend.class.getName());
     properties.put(ConfVars.HIVE_AUTHORIZATION_TASK_FACTORY.varname,
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/metastore/TestMetastoreEndToEnd.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/metastore/TestMetastoreEndToEnd.java
index cb201bb..345edc1 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/metastore/TestMetastoreEndToEnd.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/metastore/TestMetastoreEndToEnd.java
@@ -870,6 +870,14 @@
         Lists.newArrayList(new FieldSchema("col1", "int", "")));
     createMetastoreTable(client, dbName, tabName3,
         Lists.newArrayList(new FieldSchema("col1", "int", "")));
+
+    dropMetastoreDBIfExists(client, dbName2);
+    createMetastoreDB(client, dbName2);
+    createMetastoreTable(client, dbName2, tabName1,
+        Lists.newArrayList(new FieldSchema("col1", "int", "")));
+    createMetastoreTable(client, dbName2, tabName2,
+        Lists.newArrayList(new FieldSchema("col1", "int", "")));
+
     UserGroupInformation clientUgi = UserGroupInformation.createRemoteUser(ADMIN1);
     tableNames = clientUgi.doAs(new PrivilegedExceptionAction<List<String>>() {
       @Override
@@ -880,12 +888,6 @@
     assertThat(tableNames).isNotNull();
     assertThat(tableNames.size()).isEqualTo(3);
 
-    dropMetastoreDBIfExists(client, dbName2);
-    createMetastoreDB(client, dbName2);
-    createMetastoreTable(client, dbName2, tabName1,
-        Lists.newArrayList(new FieldSchema("col1", "int", "")));
-    createMetastoreTable(client, dbName2, tabName2,
-        Lists.newArrayList(new FieldSchema("col1", "int", "")));
     tableNames = clientUgi.doAs(new PrivilegedExceptionAction<List<String>>() {
       @Override
       public List<String> run() throws Exception {
@@ -894,7 +896,6 @@
     });
     assertThat(tableNames).isNotNull();
     assertThat(tableNames.size()).isEqualTo(2);
-    client.close();
 
     // Verify a user with ALL privileges on a database can get its name
     // and cannot get database name that has no privilege on
@@ -959,7 +960,6 @@
     });
     assertThat(tableNames).isNotNull();
     assertThat(tableNames.size()).isEqualTo(0);
-    client.close();
 
     // USER4_1 ALL on dbName.tabName1 and dbName2
     final HiveMetaStoreClient  client_USER4_1 = context.getMetaStoreClient(USER4_1);
@@ -971,7 +971,7 @@
       }
     });
     assertThat(tableNames).isNotNull();
-    assertThat(tableNames.size()).isEqualTo(1); // only has access to tabName1 and tabName2
+    assertThat(tableNames.size()).isEqualTo(1);
     assertThat(tableNames.get(0)).isEqualToIgnoringCase(tabName1);
     tableNames = clientUgi_USER4_1.doAs(new PrivilegedExceptionAction<List<String>>() {
       @Override
@@ -981,7 +981,6 @@
     });
     assertThat(tableNames).isNotNull();
     assertThat(tableNames.size()).isEqualTo(2);
-    client.close();
 
     // USER5_1 CREATE on server
     final HiveMetaStoreClient  client_USER5_1 = context.getMetaStoreClient(USER5_1);
@@ -1002,6 +1001,12 @@
     });
     assertThat(tableNames).isNotNull();
     assertThat(tableNames.size()).isEqualTo(2);
+
     client.close();
+    client_USER1_1.close();
+    client_USER2_1.close();
+    client_USER3_1.close();
+    client_USER4_1.close();
+    client_USER5_1.close();
   }
 }