PIP-149: Making the REST Admin API fully async

Co-author: @mattisonchao @Technoboy-

Motivation

The Rest API was originally designed to be implemented asynchronously, but with the iteration of functions, some synchronous implementations were added, resulting in many asynchronous methods called synchronous implementations. Also, many synchronous calls do not add timeouts. This greatly reduces concurrency, user operations, and experience. In order to prevent more problems, and improve code readability and maintainability, we intend to refactor these synchronous calls and standardize the implementation of the API.

Related discussion: https://lists.apache.org/thread/pkkz2jgwtzpksp6d4rdm1pyxzb3z6vmg

Goals

  • Try to avoid synchronous method calls in asynchronous methods.
  • Async variable (AsyncResponse) is placed in the first parameter position.
  • Async variable (AsyncResponse) cannot be substituted into method implementations.
  • Add more tests and increase the coverage.

Modification

  1. Avoid synchronous method calls in asynchronous methods.

    protected void internalDeleteNamespace(boolean authoritative) {
       validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
       validatePoliciesReadOnlyAccess();
    }
    

    Suggest to do like this:

    protected CompletableFuture<Void> internalDeleteNamespace(boolean authoritative) {
        return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE)
       .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
    }
    
  2. Async variable (AsyncResponse) is placed in the first parameter position

    public void deleteNamespace(@Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
    
    
  3. Async variable (AsyncResponse) cannot be substituted into method implementations

    internalCreateNonPartitionedTopicAsync(asyncResponse, authoritative, properties);
    

    Suggest to do like this:

    internalCreateNonPartitionedTopicAsync(authoritative, properties)
            .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
            .exceptionally(ex -> {
                resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
                return null;
            });
    
  4. Exception Some methods will validate ownership, like namespace ownership, topic ownership, so will throw REDIRECT exception. we need to filter this exception and not print log.

 internalCreateNonPartitionedTopicAsync(authoritative, properties)
        .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
        .exceptionally(ex -> {
            if (!isRedirectException(ex)) {
                 log.error("Failed to xxx");
            }
            resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
            return null;
        });

Task tracking

In order to unify the modification and track the modified part, it's better to open an issue to track, like Apache/Pulsar#14353, Apache/Pulsar#14013, Apache/Pulsar#13854.