When a topic has a large number of producers or consumers (over 1k), querying the pulsarAdmin.topics().getPartitionedStats()
interface is slow and the response size is also large. As a result, it's essential to give users the option of querying producer and consumer information.
Add the API for org.apache.pulsar.client.admin.Topics
CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync( String topic, boolean perPartition, GetStatsOptions getStatsOptions); CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions getStatsOptions);
None.
Implement the getPartitionedStatsAsync
method, and add the excludePublishers
and excludeConsumers
parameters to {tenant}/{namespace}/{topic}/partitioned-stats
API in PersistentTopics
and NonPersistentTopics
.
Add two fields for org.apache.pulsar.client.admin.GetStatsOptions
@Data @Builder public class GetStatsOptions { /** * Whether to exclude publishers. */ private final boolean excludePublishers; /** * Whether to exclude consumers. */ private final boolean excludeConsumers; }
Implement the getPartitionedStatsAsync
and getStatsAsync
interface for org.apache.pulsar.client.admin.internal.TopicsImpl
@Override public CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic, boolean perPartition, GetStatsOptions getStatsOptions){ TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "partitioned-stats"); path = path.queryParam("perPartition", perPartition) .queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog()) .queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize()) .queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog()); .queryParam("excludePublishers", getStatsOptions.isExcludePublishers()) .queryParam("excludeConsumers", getStatsOptions.isExcludeConsumers()); } @Override public CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions getStatsOptions){ TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "stats") .queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog()) .queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize()) .queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog()); .queryParam("excludePublishers",getStatsOptions.isExcludePublishers()) .queryParam("excludeConsumers",getStatsOptions.isExcludeConsumers()); }
Add the excludePublishers
and excludeConsumers
parameters to {tenant}/{namespace}/{topic}/partitioned-stats
API
@GET @Path("{tenant}/{namespace}/{topic}/partitioned-stats") public void getPartitionedStats( @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @PathParam("namespace") String namespace, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Get per partition stats") @QueryParam("perPartition") @DefaultValue("true") boolean perPartition, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "If return precise backlog or imprecise backlog") @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog, @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful " + "not to use when there's heavy traffic.") @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize, @ApiParam(value = "If return the earliest time in backlog") @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog, @ApiParam(value = "If exclude the publishers") @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers, @ApiParam(value = "If exclude the consumers") @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers)
Add the excludePublishers
and excludeConsumers
parameters to {tenant}/{namespace}/{topic}/stats
API
@GET @Path("{tenant}/{namespace}/{topic}/stats") public void getStats( @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @PathParam("namespace") String namespace, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "If return precise backlog or imprecise backlog") @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog, @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful " + "not to use when there's heavy traffic.") @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize, @ApiParam(value = "If return time of the earliest message in backlog") @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog, @ApiParam(value = "If exclude the publishers"), @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers, @ApiParam(value = "If exclude the consumers") @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers)
Add a new method for org.apache.pulsar.broker.service.Topic
CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions); @Data @Builder public class GetStatsOptions { /** * Set to true to get precise backlog, Otherwise get imprecise backlog. */ private final boolean getPreciseBacklog; /** * Whether to get backlog size for each subscription. */ private final boolean subscriptionBacklogSize; /** * Whether to get the earliest time in backlog. */ private final boolean getEarliestTimeInBacklog; /** * Whether to exclude publishers. */ private final boolean excludePublishers; /** * Whether to exclude consumers. */ private final boolean excludeConsumers; }
Add the following logic in org.apache.pulsar.broker.service.persistent.PersistentTopic.asyncGetStats
and org.apache.pulsar.broker.service.persistent.PersistentSubscription.getStats
:
if (!excludePublishers){ stats.addPublisher(publisherStats); } if (!excludeConsumers){ subStats.consumers.add(consumerStats); }