IMPALA-9195: Using multithreaded execution to accelerate 'show tables/databases'

If Sentry authorization is enabled, users with multi group-policies
will take time to get the result of 'show tables/databases'. It seems
that ResourceAuthorizationProvider.hasAccess performs bad for users
with complex group-policies, IMPALA-9242 will target to address this
problem.

This patch provides a config option 'num_check_authorization_threads' to
accelerate 'show tables/databases' by using multithreading. This configuration
is applicable only when authorization is enabled. A value of 1 disables
multi-threaded execution for checking access. However, a small value of larger
than 1 may limit the parallism of FE requests when checking authorization with
a high concurrency. The value must be in the range of 1 to 128. The default
value of 'num_check_access_threads' is 1.

Change-Id: I860e0d18afa0421665f8b3b1c5561d6bdacc5e96
Reviewed-on: http://gerrit.cloudera.org:8080/14846
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 451da40..40841b5 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -300,6 +300,14 @@
     "may result in false positives"
     "This overrides KRPC's --rpc_duration_too_long_ms setting.");
 
+DEFINE_int32(num_check_authorization_threads, 1,
+    "The number of threads used to check authorization for the user when executing show "
+    "tables/databases. This configuration is applicable only when authorization is "
+    "enabled. A value of 1 disables multi-threaded execution for checking authorization."
+    "However, a small value of larger than 1 may limit the parallism of FE requests when "
+    "checking authorization with a high concurrency. The value must be in the range of "
+    "1 to 128.");
+
 // ++========================++
 // || Startup flag graveyard ||
 // ++========================++
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 6906e1d..467873b 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -83,6 +83,7 @@
 DECLARE_string(blacklisted_tables);
 DECLARE_string(min_privilege_set_for_show_stmts);
 DECLARE_int32(num_expected_executors);
+DECLARE_int32(num_check_authorization_threads);
 
 namespace impala {
 
@@ -169,6 +170,7 @@
   cfg.__set_blacklisted_tables(FLAGS_blacklisted_tables);
   cfg.__set_min_privilege_set_for_show_stmts(FLAGS_min_privilege_set_for_show_stmts);
   cfg.__set_num_expected_executors(FLAGS_num_expected_executors);
+  cfg.__set_num_check_authorization_threads(FLAGS_num_check_authorization_threads);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 3489793..4762654 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -147,4 +147,6 @@
   61: required bool mt_dop_auto_fallback
 
   62: required i32 num_expected_executors
+
+  63: required i32 num_check_authorization_threads
 }
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 47c4660..d3abf78 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -197,6 +197,10 @@
     return backendCfg_.min_privilege_set_for_show_stmts;
   }
 
+  public int getNumCheckAuthorizationThreads() {
+    return backendCfg_.num_check_authorization_threads;
+  }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 5c36f37..f01d787 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -33,6 +33,9 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -172,6 +175,7 @@
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 /**
@@ -193,6 +197,10 @@
   private static final int INCONSISTENT_METADATA_NUM_RETRIES =
       BackendConfig.INSTANCE.getLocalCatalogMaxFetchRetries();
 
+  // Maximum number of threads used to check authorization for the user when executing
+  // show tables/databases.
+  private static final int MAX_CHECK_AUTHORIZATION_POOL_SIZE = 128;
+
   /**
    * Plan-time context that allows capturing various artifacts created
    * during the process.
@@ -271,6 +279,8 @@
 
   private final TransactionKeepalive transactionKeepalive_;
 
+  private static ExecutorService checkAuthorizationPool_;
+
   public Frontend(AuthorizationFactory authzFactory) throws ImpalaException {
     this(authzFactory, FeCatalogManager.createFromBackendConfig());
   }
@@ -294,6 +304,15 @@
     if (authzConfig.isEnabled()) {
       authzChecker_.set(authzFactory.newAuthorizationChecker(
           getCatalog().getAuthPolicy()));
+      int numThreads = BackendConfig.INSTANCE.getNumCheckAuthorizationThreads();
+      Preconditions.checkState(numThreads > 0
+        && numThreads <= MAX_CHECK_AUTHORIZATION_POOL_SIZE);
+      if (numThreads == 1) {
+        checkAuthorizationPool_ = MoreExecutors.sameThreadExecutor();
+      } else {
+        LOG.info("Using a thread pool of size {} for authorization", numThreads);
+        checkAuthorizationPool_ = Executors.newFixedThreadPool(numThreads);
+      }
     } else {
       authzChecker_.set(authzFactory.newAuthorizationChecker());
     }
@@ -776,6 +795,31 @@
   }
 
   /**
+   * A Callable wrapper used for checking authorization to tables/databases.
+   */
+  private class CheckAuthorization implements Callable<Boolean> {
+    private final String dbName_;
+    private final String tblName_;
+    private final String owner_;
+    private final User user_;
+
+    public CheckAuthorization(String dbName, String tblName, String owner, User user) {
+      // dbName and user cannot be null, tblName and owner can be null.
+      Preconditions.checkNotNull(dbName);
+      Preconditions.checkNotNull(user);
+      dbName_ = dbName;
+      tblName_ = tblName;
+      owner_ = owner;
+      user_ = user;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      return new Boolean(isAccessibleToUser(dbName_, tblName_, owner_, user_));
+    }
+  }
+
+  /**
    * Returns all tables in database 'dbName' that match the pattern of 'matcher' and are
    * accessible to 'user'.
    */
@@ -792,11 +836,40 @@
     }
   }
 
+  /**
+   * This method filters out elements from the given list based on the the results
+   * of the pendingCheckTasks.
+   */
+  private void filterUnaccessibleElements(List<Future<Boolean>> pendingCheckTasks,
+    List<?> checkList) throws InternalException {
+    int failedCheckTasks = 0;
+    int index = 0;
+    Iterator<?> iter = checkList.iterator();
+
+    Preconditions.checkState(checkList.size() == pendingCheckTasks.size());
+    while (iter.hasNext()) {
+      iter.next();
+      try {
+        if (!pendingCheckTasks.get(index).get()) iter.remove();
+        index++;
+      } catch (ExecutionException | InterruptedException e) {
+        failedCheckTasks++;
+        LOG.error("Encountered an error checking access", e);
+        break;
+      }
+    }
+
+    if (failedCheckTasks > 0)
+      throw new InternalException("Failed to check access." +
+          "Check the server log for more details.");
+  }
+
   private List<String> doGetTableNames(String dbName, PatternMatcher matcher,
       User user) throws ImpalaException {
     FeCatalog catalog = getCatalog();
     List<String> tblNames = catalog.getTableNames(dbName, matcher);
     if (authzFactory_.getAuthorizationConfig().isEnabled()) {
+      List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList();
       Iterator<String> iter = tblNames.iterator();
       while (iter.hasNext()) {
         String tblName = iter.next();
@@ -811,18 +884,15 @@
         String tableOwner = table.getOwnerUser();
         if (tableOwner == null) {
           LOG.info("Table {} not yet loaded, ignoring it in table listing.",
-              dbName + "." + tblName);
+            dbName + "." + tblName);
         }
-        Set<PrivilegeRequest> requests = new PrivilegeRequestBuilder(
-            authzFactory_.getAuthorizableFactory())
-            .anyOf(minPrivilegeSetForShowStmts_)
-            .onAnyColumn(dbName, tblName, tableOwner)
-            .buildSet();
-        if (!authzChecker_.get().hasAnyAccess(user, requests)) {
-          iter.remove();
-        }
+        pendingCheckTasks.add(checkAuthorizationPool_.submit(
+            new CheckAuthorization(dbName, tblName, tableOwner, user)));
       }
+
+      filterUnaccessibleElements(pendingCheckTasks, tblNames);
     }
+
     return tblNames;
   }
 
@@ -944,29 +1014,43 @@
     // have permissions on.
     if (authzFactory_.getAuthorizationConfig().isEnabled()) {
       Iterator<? extends FeDb> iter = dbs.iterator();
+      List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList();
       while (iter.hasNext()) {
         FeDb db = iter.next();
-        if (!isAccessibleToUser(db, user)) iter.remove();
+        pendingCheckTasks.add(checkAuthorizationPool_.submit(
+            new CheckAuthorization(db.getName(), null, db.getOwnerUser(), user)));
       }
+
+      filterUnaccessibleElements(pendingCheckTasks, dbs);
     }
+
     return dbs;
   }
 
   /**
-   * Check whether database is accessible to given user.
+   * Check whether table/database is accessible to given user.
    */
-  private boolean isAccessibleToUser(FeDb db, User user)
-      throws InternalException {
-    if (db.getName().toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) {
+  private boolean isAccessibleToUser(String dbName, String tblName,
+      String owner, User user) throws InternalException {
+    Preconditions.checkNotNull(dbName);
+    if (tblName == null &&
+        dbName.toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) {
       // Default DB should always be shown.
       return true;
     }
-    Set<PrivilegeRequest> requests = new PrivilegeRequestBuilder(
-        authzFactory_.getAuthorizableFactory())
-        .anyOf(minPrivilegeSetForShowStmts_)
-        .onAnyColumn(db.getName(), db.getOwnerUser())
-        .buildSet();
-    return authzChecker_.get().hasAnyAccess(user, requests);
+
+    PrivilegeRequestBuilder builder = new PrivilegeRequestBuilder(
+      authzFactory_.getAuthorizableFactory())
+      .anyOf(minPrivilegeSetForShowStmts_);
+    if (tblName == null) {
+      // Check database
+      builder = builder.onAnyColumn(dbName, owner);
+    } else {
+      // Check table
+      builder = builder.onAnyColumn(dbName, tblName, owner);
+    }
+
+    return authzChecker_.get().hasAnyAccess(user, builder.buildSet());
   }
 
   /**
diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py
index 885d184..7f57ddb 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -22,6 +22,7 @@
 import tempfile
 import grp
 import re
+import random
 import sys
 import subprocess
 import urllib
@@ -651,3 +652,26 @@
                   "--ranger_app_id=impala --authorization_provider=ranger")
   def test_ranger_show_stmts_with_any(self, unique_name):
     self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--server-name=server1 --ranger_service_type=hive "
+                 "--ranger_app_id=impala --authorization_provider=ranger "
+                 "--num_check_authorization_threads=%d" % (random.randint(2, 128)),
+    catalogd_args="--server-name=server1 --ranger_service_type=hive "
+                  "--ranger_app_id=impala --authorization_provider=ranger")
+  def test_num_check_authorization_threads_with_ranger(self, unique_name):
+    self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--server_name=server1 --sentry_config=%s "
+                 "--authorized_proxy_user_config=%s=* "
+                 "--num_check_authorization_threads=%d" %
+                 (SENTRY_CONFIG_FILE, getuser(), random.randint(2, 128)),
+    catalogd_args="--sentry_config={0}".format(SENTRY_CONFIG_FILE),
+    sentry_config=SENTRY_CONFIG_FILE_OO,  # Enable Sentry Object Ownership
+    sentry_log_dir="{0}/test_num_check_authorization_threads_with_sentry"
+                   .format(SENTRY_BASE_LOG_DIR))
+  def test_num_check_authorization_threads_with_sentry(self, unique_role, unique_name):
+    self._test_sentry_show_stmts_helper(unique_role, unique_name, PRIVILEGES)