Motivation

When a topic has a large number of producers or consumers (over 1k), querying the pulsarAdmin.topics().getPartitionedStats() interface is slow and the response size is also large. As a result, it's essential to give users the option of querying producer and consumer information.

Goals

In Scope

Add the API for org.apache.pulsar.client.admin.Topics

CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
        String topic, boolean perPartition, GetStatsOptions getStatsOptions);

CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions getStatsOptions);

Out of Scope

None.

High Level Design

Implement the getPartitionedStatsAsync method, and add the excludePublishers and excludeConsumers parameters to {tenant}/{namespace}/{topic}/partitioned-stats API in PersistentTopics and NonPersistentTopics.

Detailed Design

Design & Implementation Details

Add two fields for org.apache.pulsar.client.admin.GetStatsOptions

@Data
@Builder
public class GetStatsOptions {
    /**
     * Whether to exclude publishers.
     */
    private final boolean excludePublishers;

    /**
     * Whether to exclude consumers.
     */
    private final boolean excludeConsumers;
    
}

Implement the getPartitionedStatsAsync and getStatsAsync interface for org.apache.pulsar.client.admin.internal.TopicsImpl

@Override
public CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic, boolean perPartition, GetStatsOptions getStatsOptions){
    TopicName tn = validateTopic(topic);
    WebTarget path = topicPath(tn, "partitioned-stats");
    path = path.queryParam("perPartition", perPartition)
            .queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog())
            .queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize())
            .queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog());
            .queryParam("excludePublishers", getStatsOptions.isExcludePublishers())
            .queryParam("excludeConsumers", getStatsOptions.isExcludeConsumers());
}

@Override
public CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions getStatsOptions){
    TopicName tn = validateTopic(topic);
    WebTarget path = topicPath(tn, "stats")
            .queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog())
            .queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize())
            .queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog());
            .queryParam("excludePublishers",getStatsOptions.isExcludePublishers())
            .queryParam("excludeConsumers",getStatsOptions.isExcludeConsumers());
}        

Add the excludePublishers and excludeConsumers parameters to {tenant}/{namespace}/{topic}/partitioned-stats API

@GET
@Path("{tenant}/{namespace}/{topic}/partitioned-stats")
public void getPartitionedStats(
        @Suspended final AsyncResponse asyncResponse,
        @ApiParam(value = "Specify the tenant", required = true)
        @PathParam("tenant") String tenant,
        @ApiParam(value = "Specify the namespace", required = true)
        @PathParam("namespace") String namespace,
        @ApiParam(value = "Specify topic name", required = true)
        @PathParam("topic") @Encoded String encodedTopic,
        @ApiParam(value = "Get per partition stats")
        @QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
        @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
        @ApiParam(value = "If return precise backlog or imprecise backlog")
        @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
        @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
                + "not to use when there's heavy traffic.")
        @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
        @ApiParam(value = "If return the earliest time in backlog")
        @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
        @ApiParam(value = "If exclude the publishers")
        @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
        @ApiParam(value = "If exclude the consumers")
        @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers)

Add the excludePublishers and excludeConsumers parameters to {tenant}/{namespace}/{topic}/stats API

@GET
@Path("{tenant}/{namespace}/{topic}/stats")
public void getStats(
        @Suspended final AsyncResponse asyncResponse,
        @ApiParam(value = "Specify the tenant", required = true)
        @PathParam("tenant") String tenant,
        @ApiParam(value = "Specify the namespace", required = true)
        @PathParam("namespace") String namespace,
        @ApiParam(value = "Specify topic name", required = true)
        @PathParam("topic") @Encoded String encodedTopic,
        @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
        @ApiParam(value = "If return precise backlog or imprecise backlog")
        @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
        @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
                + "not to use when there's heavy traffic.")
        @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
        @ApiParam(value = "If return time of the earliest message in backlog")
        @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
        @ApiParam(value = "If exclude the publishers"),
        @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
        @ApiParam(value = "If exclude the consumers")
        @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers)

Add a new method for org.apache.pulsar.broker.service.Topic

CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions);

@Data
@Builder
public class GetStatsOptions {
    /**
    * Set to true to get precise backlog, Otherwise get imprecise backlog.
    */
    private final boolean getPreciseBacklog;

    /**
     * Whether to get backlog size for each subscription.
     */
    private final boolean subscriptionBacklogSize;

    /**
     * Whether to get the earliest time in backlog.
     */
    private final boolean getEarliestTimeInBacklog;

    /**
     * Whether to exclude publishers.
     */
    private final boolean excludePublishers;

    /**
     * Whether to exclude consumers.
     */
    private final boolean excludeConsumers;

}

Add the following logic in org.apache.pulsar.broker.service.persistent.PersistentTopic.asyncGetStats and org.apache.pulsar.broker.service.persistent.PersistentSubscription.getStats:

    if (!excludePublishers){
        stats.addPublisher(publisherStats);
    }
    
    if (!excludeConsumers){
        subStats.consumers.add(consumerStats);
    }

Public-facing Changes

Public API

Binary protocol

Configuration

CLI

Metrics

Monitoring

Security Considerations

Backward & Forward Compatibility

Revert

Upgrade

Alternatives

General Notes

Links