IMPALA-9195: Using multithreaded execution to accelerate 'show tables/databases'
If Sentry authorization is enabled, users with multi group-policies
will take time to get the result of 'show tables/databases'. It seems
that ResourceAuthorizationProvider.hasAccess performs bad for users
with complex group-policies, IMPALA-9242 will target to address this
problem.
This patch provides a config option 'num_check_authorization_threads' to
accelerate 'show tables/databases' by using multithreading. This configuration
is applicable only when authorization is enabled. A value of 1 disables
multi-threaded execution for checking access. However, a small value of larger
than 1 may limit the parallism of FE requests when checking authorization with
a high concurrency. The value must be in the range of 1 to 128. The default
value of 'num_check_access_threads' is 1.
Change-Id: I860e0d18afa0421665f8b3b1c5561d6bdacc5e96
Reviewed-on: http://gerrit.cloudera.org:8080/14846
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 451da40..40841b5 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -300,6 +300,14 @@
"may result in false positives"
"This overrides KRPC's --rpc_duration_too_long_ms setting.");
+DEFINE_int32(num_check_authorization_threads, 1,
+ "The number of threads used to check authorization for the user when executing show "
+ "tables/databases. This configuration is applicable only when authorization is "
+ "enabled. A value of 1 disables multi-threaded execution for checking authorization."
+ "However, a small value of larger than 1 may limit the parallism of FE requests when "
+ "checking authorization with a high concurrency. The value must be in the range of "
+ "1 to 128.");
+
// ++========================++
// || Startup flag graveyard ||
// ++========================++
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 6906e1d..467873b 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -83,6 +83,7 @@
DECLARE_string(blacklisted_tables);
DECLARE_string(min_privilege_set_for_show_stmts);
DECLARE_int32(num_expected_executors);
+DECLARE_int32(num_check_authorization_threads);
namespace impala {
@@ -169,6 +170,7 @@
cfg.__set_blacklisted_tables(FLAGS_blacklisted_tables);
cfg.__set_min_privilege_set_for_show_stmts(FLAGS_min_privilege_set_for_show_stmts);
cfg.__set_num_expected_executors(FLAGS_num_expected_executors);
+ cfg.__set_num_check_authorization_threads(FLAGS_num_check_authorization_threads);
RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 3489793..4762654 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -147,4 +147,6 @@
61: required bool mt_dop_auto_fallback
62: required i32 num_expected_executors
+
+ 63: required i32 num_check_authorization_threads
}
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 47c4660..d3abf78 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -197,6 +197,10 @@
return backendCfg_.min_privilege_set_for_show_stmts;
}
+ public int getNumCheckAuthorizationThreads() {
+ return backendCfg_.num_check_authorization_threads;
+ }
+
// Inits the auth_to_local configuration in the static KerberosName class.
private static void initAuthToLocal() {
// If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 5c36f37..f01d787 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -33,6 +33,9 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -172,6 +175,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
/**
@@ -193,6 +197,10 @@
private static final int INCONSISTENT_METADATA_NUM_RETRIES =
BackendConfig.INSTANCE.getLocalCatalogMaxFetchRetries();
+ // Maximum number of threads used to check authorization for the user when executing
+ // show tables/databases.
+ private static final int MAX_CHECK_AUTHORIZATION_POOL_SIZE = 128;
+
/**
* Plan-time context that allows capturing various artifacts created
* during the process.
@@ -271,6 +279,8 @@
private final TransactionKeepalive transactionKeepalive_;
+ private static ExecutorService checkAuthorizationPool_;
+
public Frontend(AuthorizationFactory authzFactory) throws ImpalaException {
this(authzFactory, FeCatalogManager.createFromBackendConfig());
}
@@ -294,6 +304,15 @@
if (authzConfig.isEnabled()) {
authzChecker_.set(authzFactory.newAuthorizationChecker(
getCatalog().getAuthPolicy()));
+ int numThreads = BackendConfig.INSTANCE.getNumCheckAuthorizationThreads();
+ Preconditions.checkState(numThreads > 0
+ && numThreads <= MAX_CHECK_AUTHORIZATION_POOL_SIZE);
+ if (numThreads == 1) {
+ checkAuthorizationPool_ = MoreExecutors.sameThreadExecutor();
+ } else {
+ LOG.info("Using a thread pool of size {} for authorization", numThreads);
+ checkAuthorizationPool_ = Executors.newFixedThreadPool(numThreads);
+ }
} else {
authzChecker_.set(authzFactory.newAuthorizationChecker());
}
@@ -776,6 +795,31 @@
}
/**
+ * A Callable wrapper used for checking authorization to tables/databases.
+ */
+ private class CheckAuthorization implements Callable<Boolean> {
+ private final String dbName_;
+ private final String tblName_;
+ private final String owner_;
+ private final User user_;
+
+ public CheckAuthorization(String dbName, String tblName, String owner, User user) {
+ // dbName and user cannot be null, tblName and owner can be null.
+ Preconditions.checkNotNull(dbName);
+ Preconditions.checkNotNull(user);
+ dbName_ = dbName;
+ tblName_ = tblName;
+ owner_ = owner;
+ user_ = user;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ return new Boolean(isAccessibleToUser(dbName_, tblName_, owner_, user_));
+ }
+ }
+
+ /**
* Returns all tables in database 'dbName' that match the pattern of 'matcher' and are
* accessible to 'user'.
*/
@@ -792,11 +836,40 @@
}
}
+ /**
+ * This method filters out elements from the given list based on the the results
+ * of the pendingCheckTasks.
+ */
+ private void filterUnaccessibleElements(List<Future<Boolean>> pendingCheckTasks,
+ List<?> checkList) throws InternalException {
+ int failedCheckTasks = 0;
+ int index = 0;
+ Iterator<?> iter = checkList.iterator();
+
+ Preconditions.checkState(checkList.size() == pendingCheckTasks.size());
+ while (iter.hasNext()) {
+ iter.next();
+ try {
+ if (!pendingCheckTasks.get(index).get()) iter.remove();
+ index++;
+ } catch (ExecutionException | InterruptedException e) {
+ failedCheckTasks++;
+ LOG.error("Encountered an error checking access", e);
+ break;
+ }
+ }
+
+ if (failedCheckTasks > 0)
+ throw new InternalException("Failed to check access." +
+ "Check the server log for more details.");
+ }
+
private List<String> doGetTableNames(String dbName, PatternMatcher matcher,
User user) throws ImpalaException {
FeCatalog catalog = getCatalog();
List<String> tblNames = catalog.getTableNames(dbName, matcher);
if (authzFactory_.getAuthorizationConfig().isEnabled()) {
+ List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList();
Iterator<String> iter = tblNames.iterator();
while (iter.hasNext()) {
String tblName = iter.next();
@@ -811,18 +884,15 @@
String tableOwner = table.getOwnerUser();
if (tableOwner == null) {
LOG.info("Table {} not yet loaded, ignoring it in table listing.",
- dbName + "." + tblName);
+ dbName + "." + tblName);
}
- Set<PrivilegeRequest> requests = new PrivilegeRequestBuilder(
- authzFactory_.getAuthorizableFactory())
- .anyOf(minPrivilegeSetForShowStmts_)
- .onAnyColumn(dbName, tblName, tableOwner)
- .buildSet();
- if (!authzChecker_.get().hasAnyAccess(user, requests)) {
- iter.remove();
- }
+ pendingCheckTasks.add(checkAuthorizationPool_.submit(
+ new CheckAuthorization(dbName, tblName, tableOwner, user)));
}
+
+ filterUnaccessibleElements(pendingCheckTasks, tblNames);
}
+
return tblNames;
}
@@ -944,29 +1014,43 @@
// have permissions on.
if (authzFactory_.getAuthorizationConfig().isEnabled()) {
Iterator<? extends FeDb> iter = dbs.iterator();
+ List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList();
while (iter.hasNext()) {
FeDb db = iter.next();
- if (!isAccessibleToUser(db, user)) iter.remove();
+ pendingCheckTasks.add(checkAuthorizationPool_.submit(
+ new CheckAuthorization(db.getName(), null, db.getOwnerUser(), user)));
}
+
+ filterUnaccessibleElements(pendingCheckTasks, dbs);
}
+
return dbs;
}
/**
- * Check whether database is accessible to given user.
+ * Check whether table/database is accessible to given user.
*/
- private boolean isAccessibleToUser(FeDb db, User user)
- throws InternalException {
- if (db.getName().toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) {
+ private boolean isAccessibleToUser(String dbName, String tblName,
+ String owner, User user) throws InternalException {
+ Preconditions.checkNotNull(dbName);
+ if (tblName == null &&
+ dbName.toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) {
// Default DB should always be shown.
return true;
}
- Set<PrivilegeRequest> requests = new PrivilegeRequestBuilder(
- authzFactory_.getAuthorizableFactory())
- .anyOf(minPrivilegeSetForShowStmts_)
- .onAnyColumn(db.getName(), db.getOwnerUser())
- .buildSet();
- return authzChecker_.get().hasAnyAccess(user, requests);
+
+ PrivilegeRequestBuilder builder = new PrivilegeRequestBuilder(
+ authzFactory_.getAuthorizableFactory())
+ .anyOf(minPrivilegeSetForShowStmts_);
+ if (tblName == null) {
+ // Check database
+ builder = builder.onAnyColumn(dbName, owner);
+ } else {
+ // Check table
+ builder = builder.onAnyColumn(dbName, tblName, owner);
+ }
+
+ return authzChecker_.get().hasAnyAccess(user, builder.buildSet());
}
/**
diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py
index 885d184..7f57ddb 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -22,6 +22,7 @@
import tempfile
import grp
import re
+import random
import sys
import subprocess
import urllib
@@ -651,3 +652,26 @@
"--ranger_app_id=impala --authorization_provider=ranger")
def test_ranger_show_stmts_with_any(self, unique_name):
self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--server-name=server1 --ranger_service_type=hive "
+ "--ranger_app_id=impala --authorization_provider=ranger "
+ "--num_check_authorization_threads=%d" % (random.randint(2, 128)),
+ catalogd_args="--server-name=server1 --ranger_service_type=hive "
+ "--ranger_app_id=impala --authorization_provider=ranger")
+ def test_num_check_authorization_threads_with_ranger(self, unique_name):
+ self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--server_name=server1 --sentry_config=%s "
+ "--authorized_proxy_user_config=%s=* "
+ "--num_check_authorization_threads=%d" %
+ (SENTRY_CONFIG_FILE, getuser(), random.randint(2, 128)),
+ catalogd_args="--sentry_config={0}".format(SENTRY_CONFIG_FILE),
+ sentry_config=SENTRY_CONFIG_FILE_OO, # Enable Sentry Object Ownership
+ sentry_log_dir="{0}/test_num_check_authorization_threads_with_sentry"
+ .format(SENTRY_BASE_LOG_DIR))
+ def test_num_check_authorization_threads_with_sentry(self, unique_role, unique_name):
+ self._test_sentry_show_stmts_helper(unique_role, unique_name, PRIVILEGES)