Co-author: @mattisonchao @Technoboy-
The Rest API was originally designed to be implemented asynchronously, but with the iteration of functions, some synchronous implementations were added, resulting in many asynchronous methods called synchronous implementations. Also, many synchronous calls do not add timeouts. This greatly reduces concurrency, user operations, and experience. In order to prevent more problems, and improve code readability and maintainability, we intend to refactor these synchronous calls and standardize the implementation of the API.
Related discussion: https://lists.apache.org/thread/pkkz2jgwtzpksp6d4rdm1pyxzb3z6vmg
Avoid synchronous method calls in asynchronous methods.
protected void internalDeleteNamespace(boolean authoritative) { validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE); validatePoliciesReadOnlyAccess(); }
Suggest to do like this:
protected CompletableFuture<Void> internalDeleteNamespace(boolean authoritative) { return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()); }
Async variable (AsyncResponse) is placed in the first parameter position
public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @QueryParam("force") @DefaultValue("false") boolean force, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
Async variable (AsyncResponse) cannot be substituted into method implementations
internalCreateNonPartitionedTopicAsync(asyncResponse, authoritative, properties);
Suggest to do like this:
internalCreateNonPartitionedTopicAsync(authoritative, properties) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { resumeAsyncResponseExceptionally(asyncResponse, ex.getCause()); return null; });
Exception Some methods will validate ownership, like namespace ownership, topic ownership, so will throw REDIRECT exception. we need to filter this exception and not print log.
internalCreateNonPartitionedTopicAsync(authoritative, properties) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { if (!isRedirectException(ex)) { log.error("Failed to xxx"); } resumeAsyncResponseExceptionally(asyncResponse, ex.getCause()); return null; });
In order to unify the modification and track the modified part, it's better to open an issue to track, like Apache/Pulsar#14353, Apache/Pulsar#14013, Apache/Pulsar#13854.