import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';
:::tip
This page only shows some frequently used operations.
For the latest and complete information about Pulsar admin
, including commands, flags, descriptions, and more, see Pulsar admin doc
For the latest and complete information about REST API
, including parameters, responses, samples, and more, see {@inject: rest:REST:/} API doc.
For the latest and complete information about Java admin API
, including classes, methods, descriptions, and more, see Java admin API doc.
:::
In the production environment, there may be some long-lasting transactions that have never been completed. You can get these slow transactions that have survived over a certain time under a coordinator or all coordinators in the following ways.
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions slow-transactions \ -c 1 -t 1s ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/slowTransactions/:timeout|operation/getSlowTransactions?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transaction().getSlowTransactionsByCoordinatorId(coordinatorId, timeout, timeUnit) //Or get slow transactions from all coordinators admin.transaction().getSlowTransactions(timeout, timeUnit) ``` </TabItem> </Tabs>
The following is an example of the returned values.
{ "(0,3)": { "txnId": "(0,3)", "status": "OPEN", "openTimestamp": 1658120122474, "timeoutAt": 300000, "producedPartitions": {}, "ackedPartitions": {} }, "(0,2)": { "txnId": "(0,2)", "status": "OPEN", "openTimestamp": 1658120122471, "timeoutAt": 300000, "producedPartitions": {}, "ackedPartitions": {} }, "(0,5)": { "txnId": "(0,5)", "status": "OPEN", "openTimestamp": 1658120122478, "timeoutAt": 300000, "producedPartitions": {}, "ackedPartitions": {} }, "(0,4)": { "txnId": "(0,4)", "status": "OPEN", "openTimestamp": 1658120122476, "timeoutAt": 300000, "producedPartitions": {}, "ackedPartitions": {} }, "(0,7)": { "txnId": "(0,7)", "status": "OPEN", "openTimestamp": 1658120122482, "timeoutAt": 300000, "producedPartitions": {}, "ackedPartitions": {} }, "(0,10)": { "txnId": "(0,10)", "status": "OPEN", "openTimestamp": 1658120122488, "timeoutAt": 300000, "producedPartitions": {}, "ackedPartitions": {} }, "(0,6)": { "txnId": "(0,6)", "status": "OPEN", "openTimestamp": 1658120122480, "timeoutAt": 300000, "producedPartitions": {}, "ackedPartitions": {} }, "(0,9)": { "txnId": "(0,9)", "status": "OPEN", "openTimestamp": 1658120122486, "timeoutAt": 300000, "producedPartitions": {}, "ackedPartitions": {} }, "(0,8)": { "txnId": "(0,8)", "status": "OPEN", "openTimestamp": 1658120122484, "timeoutAt": 300000, "producedPartitions": {}, "ackedPartitions": {} }, "(0,11)": { "txnId": "(0,11)", "status": "OPEN", "openTimestamp": 1658120122490, "timeoutAt": 300000, "producedPartitions": {}, "ackedPartitions": {} } }
When the performance of transactions reaches a bottleneck due to the insufficient number of transaction coordinators, you can scale the number of the transaction coordinators in the following ways.
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions scale-transactionCoordinators \ -r 17 ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/transactionCoordinator/:replicas|operation/scaleTransactionCoordinators?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transaction().scaleTransactionCoordinators(replicas); ``` </TabItem> </Tabs>
The transaction metadata that can be retrieved include:
Use one of the following ways to get your transaction metadata.
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions transaction-metadata\ -m 1 -l 1 ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/transactionMetadata/:mostSigBits/:leastSigBits|operation/getTransactionMetadata?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transactions().getTransactionMetadata(txnID); ``` </TabItem> </Tabs>
The following is an example of the returned values.
{ "txnId" : "(1,18)", "status" : "ABORTING", "openTimestamp" : 1656592983374, "timeoutAt" : 5000, "producedPartitions" : { "my-topic" : { "startPosition" : "127:4959", "aborted" : true } }, "ackedPartitions" : { "my-topic" : { "mysubName" : { "cumulativeAckPosition" : null } } } }
The transaction stats in transaction pending ack that can be retrieved include:
Use one of the following ways to get transaction stats in pending ack:
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions transaction-in-pending-ack-stats \ -m 1 -l 1 -t my-topic -s mysubname ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/transactionInPendingAckStats/:tenant/:namespace/:topic/:subName/:mostSigBits/:leastSigBits|operation/getTransactionInPendingAckStats?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transaction().getTransactionInPendingAckStats(txnID, topic, subname); ``` </TabItem> </Tabs>
The following is an example of the returned value.
{ "cumulativeAckPosition" : "137:49959" }
The transaction stats in the transaction buffer that can be retrieved include:
Use one of the following ways to get transaction stats in transaction buffer:
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions transaction-in-buffer-stats \ -m 1 -l 1 -t my-topic ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/transactionInBufferStats/:tenant/:namespace/:topic/:mostSigBits/:leastSigBits|operation/getTransactionInBufferStats?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transaction().getTransactionInBufferStatsAsync(txnID, topic); ``` </TabItem> </Tabs>
The following is an example of the returned values.
{ "startPosition" : "137:49759", "aborted" : false }
The transaction coordinator (TC) is a module inside a Pulsar broker. It maintains the entire life cycle of transactions and handles transaction timeout.
The transaction coordinator stats that can be retrieved include:
0L
means no startup.0L
means no startup.Use one of the following ways to get transaction coordinator stats:
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions coordinator-stats \ -c 1 ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/coordinatorStats|operation/getCoordinatorStats?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transaction().getCoordinatorStatsById(coordinatorId); //Or get all coordinator stats. admin.transaction().getCoordinatorStats(); ``` </TabItem> </Tabs>
The following is an example of the returned values.
{ "state" : "Ready", "leastSigBits" : 1, "lowWaterMark" : 0, "ongoingTxnSize" : 0, "recoverStartTime" : 1657021892377, "recoverEndTime" : 1657021892378 }
The coordinator's internal stats that can be retrieved include:
[managedLedgerInternalStats](admin-api-topics.md#get-internal-stats)
for more details.Use one of the following ways to get coordinator’s internal stats:
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions coordinator-internal-stats \ -c 1 -m ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/coordinatorInternalStats/:coordinatorId|operation/getCoordinatorInternalStats?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transaction().getCoordinatorInternalStats(coordinatorId, metadata); ``` </TabItem> </Tabs>
The following is an example of the returned values.
{ "transactionLogStats" : { "managedLedgerName" : "pulsar/system/persistent/__transaction_log_1", "managedLedgerInternalStats" : { "entriesAddedCounter" : 3, "numberOfEntries" : 3, "totalSize" : 63, "currentLedgerEntries" : 3, "currentLedgerSize" : 63, "lastLedgerCreatedTimestamp" : "2022-06-30T18:18:05.88+08:00", "waitingCursorsCount" : 0, "pendingAddEntriesCount" : 0, "lastConfirmedEntry" : "13:2", "state" : "LedgerOpened", "ledgers" : [ { "ledgerId" : 13, "entries" : 0, "size" : 0, "offloaded" : false, "metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=63, lastEntryId=2, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVsc2FyL3N5c3RlbS9wZXJzaXN0ZW50L19fdHJhbnNhY3Rpb25fbG9nXzE=, application=base64:cHVsc2Fy}}", "underReplicated" : false } ], "cursors" : { "transaction.subscription" : { "markDeletePosition" : "13:2", "readPosition" : "13:3", "waitingReadOp" : false, "pendingReadOps" : 0, "messagesConsumedCounter" : 3, "cursorLedger" : 22, "cursorLedgerLastEntry" : 1, "individuallyDeletedMessages" : "[]", "lastLedgerSwitchTimestamp" : "2022-06-30T18:18:05.932+08:00", "state" : "Open", "numberOfEntriesSinceFirstNotAckedMessage" : 1, "totalNonContiguousDeletedMessagesRange" : 0, "subscriptionHavePendingRead" : false, "subscriptionHavePendingReplayRead" : false, "properties" : { } } } } } }
Pending ack maintains message acknowledgments within a transaction before a transaction completes. If a message is in the pending acknowledge state, the message cannot be acknowledged by other transactions until the message is removed from the pending acknowledge state.
The transaction pending ack state stats that can be retrieved include:
0L
means no startup.0L
means no startup.Use one of the following ways to get transaction pending ack stats:
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions pending-ack-stats \ -t my-topic -s mysubName -l ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/pendingAckStats/:tenant/:namespace:/:topic:/:subName|operation/getPendingAckStats?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transaction().getPendingAckStats(topic, subName, lowWaterMarks) ``` </TabItem> </Tabs>
The following is an example of the returned values.
{ "state" : "Ready", "lowWaterMarks" : { "1" : 0 }, "ongoingTxnSize" : 1, "recoverStartTime" : 1657021899202, "recoverEndTime" : 1657021899203 }
The transaction pending ack internal stats that can be retrieved include:
[managedLedgerInternalStats](admin-api-topics.md#get-internal-stats)
for more details.Use one of the following ways to get transaction pending ack internal stats:
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions pending-ack-internal-stats \ -t my-topic -s mysubName -m ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/pendingAckInternalStats/:tenant/:namespace:/:topic:/:subName|operation/getPendingAckInternalStats?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transaction().getPendingAckInternalStats(topic, subName, boolean metadata); ``` </TabItem> </Tabs>
The following is an example of the returned values.
{ "pendingAckLogStats" : { "managedLedgerName" : "public/default/persistent/my-topic-mysubName__transaction_pending_ack", "managedLedgerInternalStats" : { "entriesAddedCounter" : 2247, "numberOfEntries" : 2247, "totalSize" : 37212, "currentLedgerEntries" : 104, "currentLedgerSize" : 1732, "lastLedgerCreatedTimestamp" : "2022-06-30T19:02:09.746+08:00", "waitingCursorsCount" : 0, "pendingAddEntriesCount" : 52, "lastConfirmedEntry" : "64:51", "state" : "LedgerOpened", "ledgers" : [ { "ledgerId" : 56, "entries" : 2195, "size" : 36346, "offloaded" : false, "metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=36346, lastEntryId=2194, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVibGljL2RlZmF1bHQvcGVyc2lzdGVudC9teS10b3BpYy1teXN1Yk5hbWVfX3RyYW5zYWN0aW9uX3BlbmRpbmdfYWNr, application=base64:cHVsc2Fy}}", "underReplicated" : false }, { "ledgerId" : 64, "entries" : 0, "size" : 0, "offloaded" : false, "metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=866, lastEntryId=51, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVibGljL2RlZmF1bHQvcGVyc2lzdGVudC9teS10b3BpYy1teXN1Yk5hbWVfX3RyYW5zYWN0aW9uX3BlbmRpbmdfYWNr, application=base64:cHVsc2Fy}}", "underReplicated" : false } ], "cursors" : { "__pending_ack_state" : { "markDeletePosition" : "56:-1", "readPosition" : "56:0", "waitingReadOp" : false, "pendingReadOps" : 0, "messagesConsumedCounter" : 0, "cursorLedger" : 57, "cursorLedgerLastEntry" : 0, "individuallyDeletedMessages" : "[]", "lastLedgerSwitchTimestamp" : "2022-06-30T18:55:26.842+08:00", "state" : "Open", "numberOfEntriesSinceFirstNotAckedMessage" : 1, "totalNonContiguousDeletedMessagesRange" : 0, "subscriptionHavePendingRead" : false, "subscriptionHavePendingReplayRead" : false, "properties" : { } } } } } }
The position stats in pending ack include:
If you want to know whether the position has been acknowledged, you can use one of the following ways to get position stats pending ack:
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions position-stats-in-pending-ack \ -t my-topic -s mysubName -l 15 -e 6 ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/pendingAckStats /:tenant/:namespace:/:topic:/:subName/:ledgerId/:entryId?batchIndex=batchIndex|operation/getPositionStatsInPendingAck?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transaction().getPositionStatsInPendingAckAsync(topic, subName, ledgerId, entryId, lowWaterMarks); ``` </TabItem> </Tabs>
The following is an example of the returned values.
{ “State” : “MarkDelete” }
Transaction buffer handles messages produced to a topic partition within a transaction. The messages in the transaction buffer are not visible to consumers until the transactions are committed. The messages in the transaction buffer are discarded when the transactions are aborted.
The transaction buffer stats that can be retrieved include:
0L
means no startup.0L
means no startup.Use one of the following ways to get transaction buffer stats:
<Tabs groupId="api-choice" defaultValue="pulsar-admin" values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> <TabItem value="pulsar-admin"> ```shell $ pulsar-admin transactions transaction-buffer-stats \ -t my-topic -l ``` </TabItem> <TabItem value="REST API"> {@inject: endpoint|GET|/admin/v3/transactions/transactionBufferStats/:tenant/:namespace:/:topic:/:subName|operation/getTransactionBufferStats?version=@pulsar:version_number@} </TabItem> <TabItem value="Java"> ```java admin.transaction().getTransactionBufferStats(topic, lowWaterMarks); ``` </TabItem> </Tabs>
The following is an example of the returned values.
{ "state" : "Ready", "maxReadPosition" : "38:101", "lastSnapshotTimestamps" : 1657021903534, "lowWaterMarks" : { "1" : -1, "2" : -1 }, "ongoingTxnSize" : 0, "recoverStartTime" : 1657021892850, "recoverEndTime" : 1657021893372 }