Persistent helps to access topic which is a logical endpoint for publishing and consuming messages. Producers publish messages to the topic and consumers subscribe to the topic, to consume messages published to the topic.
In all of the instructions and commands below, the topic name structure is:
{% include topic.html p=“property” c=“cluster” n=“namespace” t=“topic” %}
It provides a list of persistent topics exist under a given namespace.
List of topics can be fetched using list
command.
$ pulsar-admin persistent list \ my-property/my-cluster/my-namespace \ my-topic
{% endpoint GET /admin/persistent/:property/:cluster/:namespace %}
String namespace = "my-property/my-cluster-my-namespace"; admin.persistentTopics().getList(namespace);
It grants permissions on a client role to perform specific actions on a given topic.
Permission can be granted using grant-permission
command.
$ pulsar-admin persistent grant-permission \ --actions produce,consume --role application1 \ persistent://test-property/cl1/ns1/tp1 \
{% endpoint POST /admin/namespaces/:property/:cluster/:namespace/permissions/:role %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; String role = "test-role"; Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume); admin.persistentTopics().grantPermission(destination, role, actions);
Permission can be fetched using permissions
command.
TODO: admin
$ pulsar-admin persistent permissions \ persistent://test-property/cl1/ns1/tp1 \ { "application1": [ "consume", "produce" ] }
{% endpoint GET /admin/namespaces/:property/:cluster/:namespace/permissions %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; admin.persistentTopics().getPermissions(destination);
It revokes a permission which was granted on a client role.
Permission can be revoked using revoke-permission
command.
$ pulsar-admin persistent revoke-permission \ --role application1 \ persistent://test-property/cl1/ns1/tp1 \ { "application1": [ "consume", "produce" ] }
{% endpoint DELETE /admin/namespaces/:property/:cluster/:namespace/permissions/:role %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; String role = "test-role"; admin.persistentTopics().revokePermissions(destination, role);
It deletes a topic. The topic cannot be deleted if there's any active subscription or producers connected to it.
Topic can be deleted using delete
command.
$ pulsar-admin persistent delete \ persistent://test-property/cl1/ns1/tp1 \
{% endpoint DELETE /admin/persistent/:property/:cluster/:namespace/:destination %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; admin.persistentTopics().delete(destination);
It unloads a topic.
Topic can be unloaded using unload
command.
$ pulsar-admin persistent unload \ persistent://test-property/cl1/ns1/tp1 \
{% endpoint PUT /admin/persistent/:property/:cluster/:namespace/:destination/unload %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; admin.persistentTopics().unload(destination);
It shows current statistics of a given non-partitioned topic.
msgRateIn: The sum of all local and replication publishers' publish rates in messages per second
msgThroughputIn: Same as above, but in bytes per second instead of messages per second
msgRateOut: The sum of all local and replication consumers' dispatch rates in messages per second
msgThroughputOut: Same as above, but in bytes per second instead of messages per second
averageMsgSize: The average size in bytes of messages published within the last interval
storageSize: The sum of the ledgers' storage size for this topic. See
publishers: The list of all local publishers into the topic. There can be zero or thousands
averageMsgSize: Average message size in bytes from this publisher within the last interval
producerId: Internal identifier for this producer on this topic
producerName: Internal identifier for this producer, generated by the client library
address: IP address and source port for the connection of this producer
connectedSince: Timestamp this producer was created or last reconnected
subscriptions: The list of all local subscriptions to the topic
my-subscription: The name of this subscription (client defined)
msgBacklog: The count of messages in backlog for this subscription
type: This subscription type
msgRateExpired: The rate at which messages were discarded instead of dispatched from this subscription due to TTL
consumers: The list of connected consumers for this subscription
consumerName: Internal identifier for this consumer, generated by the client library
availablePermits: The number of messages this consumer has space for in the client library‘s listen queue. A value of 0 means the client library’s queue is full and receive() isn't being called. A nonzero value means this consumer is ready to be dispatched messages.
replication: This section gives the stats for cross-colo replication of this topic
replicationBacklog: The outbound replication backlog in messages
connected: Whether the outbound replicator is connected
replicationDelayInSeconds: How long the oldest message has been waiting to be sent through the connection, if connected is true
inboundConnection: The IP and port of the broker in the remote cluster's publisher connection to this broker
inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.
{ "msgRateIn": 4641.528542257553, "msgThroughputIn": 44663039.74947473, "msgRateOut": 0, "msgThroughputOut": 0, "averageMsgSize": 1232439.816728665, "storageSize": 135532389160, "publishers": [ { "msgRateIn": 57.855383881403576, "msgThroughputIn": 558994.7078932219, "averageMsgSize": 613135, "producerId": 0, "producerName": null, "address": null, "connectedSince": null } ], "subscriptions": { "my-topic_subscription": { "msgRateOut": 0, "msgThroughputOut": 0, "msgBacklog": 116632, "type": null, "msgRateExpired": 36.98245516804671, "consumers": [] } }, "replication": {} }
Topic stats can be fetched using stats
command.
$ pulsar-admin persistent stats \ persistent://test-property/cl1/ns1/tp1 \
{% endpoint GET /admin/persistent/:property/:cluster/:namespace/:destination/stats %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; admin.persistentTopics().getStats(destination);
It shows detailed statistics of a topic.
entriesAddedCounter: Messages published since this broker loaded this topic
numberOfEntries: Total number of messages being tracked
totalSize: Total storage size in bytes of all messages
currentLedgerEntries: Count of messages written to the ledger currently open for writing
currentLedgerSize: Size in bytes of messages written to ledger currently open for writing
lastLedgerCreatedTimestamp: time when last ledger was created
lastLedgerCreationFailureTimestamp: time when last ledger was failed
waitingCursorsCount: How many cursors are “caught up” and waiting for a new message to be published
pendingAddEntriesCount: How many messages have (asynchronous) write requests we are waiting on completion
lastConfirmedEntry: The ledgerid:entryid of the last message successfully written. If the entryid is -1, then the ledger has been opened or is currently being opened but has no entries written yet.
state: The state of this ledger for writing. LedgerOpened means we have a ledger open for saving published messages.
ledgers: The ordered list of all ledgers for this topic holding its messages
cursors: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.
markDeletePosition: The ack position: the last message the subscriber acknowledged receiving
readPosition: The latest position of subscriber for reading message
waitingReadOp: This is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published.
pendingReadOps: The counter for how many outstanding read requests to the BookKeepers we have in progress
messagesConsumedCounter: Number of messages this cursor has acked since this broker loaded this topic
cursorLedger: The ledger being used to persistently store the current markDeletePosition
cursorLedgerLastEntry: The last entryid used to persistently store the current markDeletePosition
individuallyDeletedMessages: If Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position
lastLedgerSwitchTimestamp: The last time the cursor ledger was rolled over
state: The state of the cursor ledger: Open means we have a cursor ledger for saving updates of the markDeletePosition.
{ "entriesAddedCounter": 20449518, "numberOfEntries": 3233, "totalSize": 331482, "currentLedgerEntries": 3233, "currentLedgerSize": 331482, "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825", "lastLedgerCreationFailureTimestamp": null, "waitingCursorsCount": 1, "pendingAddEntriesCount": 0, "lastConfirmedEntry": "324711539:3232", "state": "LedgerOpened", "ledgers": [ { "ledgerId": 324711539, "entries": 0, "size": 0 } ], "cursors": { "my-subscription": { "markDeletePosition": "324711539:3133", "readPosition": "324711539:3233", "waitingReadOp": true, "pendingReadOps": 0, "messagesConsumedCounter": 20449501, "cursorLedger": 324702104, "cursorLedgerLastEntry": 21, "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]", "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313", "state": "Open" } } }
Topic internal-stats can be fetched using stats-internal
command.
$ pulsar-admin persistent stats-internal \ persistent://test-property/cl1/ns1/tp1 \
{% endpoint GET /admin/persistent/:property/:cluster/:namespace/:destination/internalStats %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; admin.persistentTopics().getInternalStats(destination);
It peeks N messages for a specific subscription of a given topic.
$ pulsar-admin persistent peek-messages \ --count 10 --subscription my-subscription \ persistent://test-property/cl1/ns1/tp1 \ Message ID: 315674752:0 Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" } msg-payload
{% endpoint GET /admin/persistent/:property/:cluster/:namespace/:destination/subscription/:subName/position/:messagePosition %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; String subName = "my-subscription"; int numMessages = 1; admin.persistentTopics().peekMessages(destination, subName, numMessages);
It skips N messages for a specific subscription of a given topic.
$ pulsar-admin persistent skip \ --count 10 --subscription my-subscription \ persistent://test-property/cl1/ns1/tp1 \
{% endpoint POST /admin/persistent/:property/:cluster/:namespace/:destination/subscription/:subName/skip/:numMessages %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; String subName = "my-subscription"; int numMessages = 1; admin.persistentTopics().skipMessages(destination, subName, numMessages);
It skips all old messages for a specific subscription of a given topic.
$ pulsar-admin persistent skip-all \ --subscription my-subscription \ persistent://test-property/cl1/ns1/tp1 \
{% endpoint POST /admin/persistent/:property/:cluster/:namespace/:destination/subscription/:subName/skip_all %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; String subName = "my-subscription"; admin.persistentTopics().skipAllMessages(destination, subName);
It resets a subscription’s cursor position back to the position which was recorded X minutes before. It essentially calculates time and position of cursor at X minutes before and resets it at that position.
$ pulsar-admin persistent reset-cursor \ --subscription my-subscription --time 10 \ persistent://test-property/cl1/ns1/tp1 \
{% endpoint POST /admin/persistent/:property/:cluster/:namespace/:destination/subscription/:subName/resetcursor/:timestamp %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; String subName = "my-subscription"; long timestamp = 2342343L; admin.persistentTopics().skipAllMessages(destination, subName, timestamp);
It locates broker url which is serving the given topic.
$ pulsar-admin persistent lookup \ persistent://test-property/cl1/ns1/tp1 \ "pulsar://broker1.org.com:4480"
{% endpoint GET /lookup/v2/destination/persistent/:property/:cluster/:namespace/:destination %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; admin.lookup().lookupDestination(destination);
It gives range of the bundle which contains given topic
$ pulsar-admin persistent bundle-range \ persistent://test-property/cl1/ns1/tp1 \ "0x00000000_0xffffffff"
{% endpoint GET /lookup/v2/destination/:destination_domain/:property/:cluster/:namespace/:destination/bundle %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; admin.lookup().getBundleRange(destination);
It shows all subscription names for a given topic.
$ pulsar-admin persistent subscriptions \ persistent://test-property/cl1/ns1/tp1 \ my-subscription
{% endpoint GET /admin/persistent/:property/:cluster/:namespace/:destination/subscriptions %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; admin.persistentTopics().getSubscriptions(destination);
It can also help to unsubscribe a subscription which is no more processing further messages.
$ pulsar-admin persistent unsubscribe \ --subscription my-subscription \ persistent://test-property/cl1/ns1/tp1 \
{% endpoint POST /admin/namespaces/:property/:cluster/:namespace/unsubscribe/:subscription %}
String destination = "persistent://my-property/my-cluster-my-namespace/my-topic"; String subscriptionName = "my-subscription"; admin.persistentTopics().deleteSubscription(destination, subscriptionName);