[branch-2.9][fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#16083)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 6943e95..ae36a9b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -43,6 +43,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -396,11 +397,15 @@
AuthenticationDataSource authData) {
try {
return allowTenantOperationAsync(
- tenantName, operation, originalRole, role, authData).get();
+ tenantName, operation, originalRole, role, authData).get(
+ conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
+ } catch (TimeoutException e) {
+ throw new RestException(e);
}
}
@@ -521,11 +526,15 @@
AuthenticationDataSource authData) {
try {
return allowNamespacePolicyOperationAsync(
- namespaceName, policy, operation, originalRole, role, authData).get();
+ namespaceName, policy, operation, originalRole, role, authData).get(
+ conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
+ } catch (TimeoutException e) {
+ throw new RestException(e);
}
}
@@ -585,11 +594,15 @@
AuthenticationDataSource authData) {
try {
return allowTopicPolicyOperationAsync(
- topicName, policy, operation, originalRole, role, authData).get();
+ topicName, policy, operation, originalRole, role, authData).get(
+ conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
+ } catch (TimeoutException e) {
+ throw new RestException(e);
}
}
@@ -667,9 +680,10 @@
TopicOperation operation,
String originalRole,
String role,
- AuthenticationDataSource authData) {
+ AuthenticationDataSource authData) throws Exception {
try {
- return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get();
+ return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(
+ conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b28a510..4ff236a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -233,7 +233,7 @@
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception ve) {
try {
- checkAuthorization(pulsar(), topicName, clientAppId(), clientAuthData());
+ checkAuthorizationAsync(pulsar(), topicName, clientAppId(), clientAuthData());
} catch (RestException re) {
throw re;
} catch (Exception e) {
@@ -3559,46 +3559,55 @@
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
- try {
- // (1) authorize client
- try {
- checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
- } catch (RestException e) {
- try {
- validateAdminAccessForTenant(pulsar,
- clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
- } catch (RestException authException) {
- log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
- throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
- clientAppId, topicName.toString(), authException.getMessage()));
- }
- } catch (Exception ex) {
- // throw without wrapping to PulsarClientException that considers: unknown error marked as internal
- // server error
- log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId,
- topicName.toString(), ex.getMessage(), ex);
- throw ex;
- }
+ CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
+ checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
+ .thenRun(() -> authorizationFuture.complete(null))
+ .exceptionally(e -> {
+ Throwable throwable = FutureUtil.unwrapCompletionException(e);
+ if (throwable instanceof RestException) {
+ validateAdminAccessForTenantAsync(pulsar,
+ clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
+ .thenRun(() -> {
+ authorizationFuture.complete(null);
+ }).exceptionally(ex -> {
+ Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
+ if (throwable2 instanceof RestException) {
+ log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
+ authorizationFuture.completeExceptionally(new PulsarClientException(
+ String.format("Authorization failed %s on topic %s with error %s",
+ clientAppId, topicName, throwable2.getMessage())));
+ } else {
+ authorizationFuture.completeExceptionally(throwable2);
+ }
+ return null;
+ });
+ } else {
+ // throw without wrapping to PulsarClientException that considers: unknown error marked as
+ // internal server error
+ log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
+ authorizationFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
- // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
- // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
- // producer/consumer
- checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
- .thenCompose(res -> pulsar.getBrokerService()
- .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
- .thenAccept(metadata -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
- metadata.partitions);
- }
- metadataFuture.complete(metadata);
- }).exceptionally(ex -> {
- metadataFuture.completeExceptionally(ex.getCause());
- return null;
- });
- } catch (Exception ex) {
- metadataFuture.completeExceptionally(ex);
- }
+ // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
+ // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
+ // producer/consumer
+ authorizationFuture.thenCompose(__ ->
+ checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
+ .thenCompose(res ->
+ pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+ .thenAccept(metadata -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
+ metadata.partitions);
+ }
+ metadataFuture.complete(metadata);
+ })
+ .exceptionally(e -> {
+ metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+ return null;
+ });
return metadataFuture;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index dab1b29..967059c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -48,6 +48,7 @@
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -219,23 +220,14 @@
cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
- differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
+ differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+ requestId, false));
} else {
// (2) authorize client
- try {
- checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
- } catch (RestException authException) {
- log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
- validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
- authException.getMessage(), requestId));
- return;
- } catch (Exception e) {
- log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
- validationFuture.completeExceptionally(e);
- return;
- }
- // (3) validate global namespace
- checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
+ checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
+ // (3) validate global namespace
+ checkLocalOrGetPeerReplicationCluster(pulsarService,
+ topicName.getNamespaceObject())
.thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
@@ -247,21 +239,36 @@
if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
&& StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
- "Redirected cluster's brokerService url is not configured", requestId));
+ "Redirected cluster's brokerService url is not configured",
+ requestId));
return;
}
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
- peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId,
+ peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
+ requestId,
false));
-
}).exceptionally(ex -> {
- validationFuture.complete(
- newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
- return null;
- });
+ validationFuture.complete(
+ newLookupErrorResponse(ServerError.MetadataError,
+ FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
+ return null;
+ });
+ })
+ .exceptionally(e -> {
+ Throwable throwable = FutureUtil.unwrapCompletionException(e);
+ if (throwable instanceof RestException) {
+ log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
+ validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
+ throwable.getMessage(), requestId));
+ } else {
+ log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
+ validationFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
}
}).exceptionally(ex -> {
- validationFuture.completeExceptionally(ex);
+ validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 7b5f455..20652d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -245,6 +245,86 @@
}
}
+ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+ PulsarService pulsar, String clientAppId,
+ String originalPrincipal, String tenant,
+ AuthenticationDataSource authenticationData) {
+ if (log.isDebugEnabled()) {
+ log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant,
+ (isClientAuthenticated(clientAppId)), clientAppId);
+ }
+ return pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+ .thenCompose(tenantInfoOptional -> {
+ if (!tenantInfoOptional.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
+ }
+ TenantInfo tenantInfo = tenantInfoOptional.get();
+ if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration()
+ .isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId)) {
+ throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+ }
+ validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId,
+ originalPrincipal);
+ if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+ AuthorizationService authorizationService =
+ pulsar.getBrokerService().getAuthorizationService();
+ return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+ authenticationData)
+ .thenCompose(isTenantAdmin -> {
+ String debugMsg = "Successfully authorized {} (proxied by {}) on tenant {}";
+ if (!isTenantAdmin) {
+ return authorizationService.isSuperUser(clientAppId, authenticationData)
+ .thenCombine(authorizationService.isSuperUser(originalPrincipal,
+ authenticationData),
+ (proxyAuthorized, originalPrincipalAuthorized) -> {
+ if (!proxyAuthorized || !originalPrincipalAuthorized) {
+ throw new RestException(Status.UNAUTHORIZED,
+ String.format(
+ "Proxy not authorized to access "
+ + "resource (proxy:%s,original:%s)"
+ , clientAppId, originalPrincipal));
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(debugMsg, originalPrincipal,
+ clientAppId, tenant);
+ }
+ return null;
+ }
+ });
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(debugMsg, originalPrincipal, clientAppId, tenant);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ } else {
+ return pulsar.getBrokerService()
+ .getAuthorizationService()
+ .isSuperUser(clientAppId, authenticationData)
+ .thenCompose(isSuperUser -> {
+ if (!isSuperUser) {
+ return pulsar.getBrokerService().getAuthorizationService()
+ .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData);
+ } else {
+ return CompletableFuture.completedFuture(true);
+ }
+ }).thenAccept(authorized -> {
+ if (!authorized) {
+ throw new RestException(Status.UNAUTHORIZED,
+ "Don't have permission to administrate resources on this tenant");
+ } else {
+ log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant);
+ }
+ });
+ }
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
+
protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
String originalPrincipal, String tenant,
AuthenticationDataSource authenticationData)
@@ -795,18 +875,22 @@
return null;
}
- protected static void checkAuthorization(PulsarService pulsarService, TopicName topicName, String role,
- AuthenticationDataSource authenticationData) throws Exception {
+ protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService,
+ TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
// No enforcing of authorization policies
- return;
+ return CompletableFuture.completedFuture(null);
}
// get zk policy manager
- if (!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName,
- TopicOperation.LOOKUP, null, role, authenticationData)) {
- log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
- throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
- }
+ return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> {
+ if (!allow) {
+ log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
+ throw new RestException(Status.UNAUTHORIZED,
+ "Don't have permission to connect to this namespace");
+ }
+ });
}
// Used for unit tests access