Pulsar adminツールはコマンドラインユーティリティであり、プロパティ、クラスタ、トピック、ネームスペースなどPulsar Brokerのさまざまなエンティティを設定および監視するための、Brokerと管理の間のインターフェースを提供します。これはプロパティ、クラスタ、ネームスペースを作成できるようにし、アプリケーションをPulsarに載せるのに便利な管理ツールです。またトピックの状態と使用方法を管理する際にも役立ちます。CLI (command line interface) の他に、上述のエンティティを管理および監視するためにREST APIとJava APIの2つの代替手段を使うこともできます。そこでこのドキュメントではCLI, REST API, Java APIを使ってPulsarのエンティティを管理する方法について説明します。
前述の通り、Pulsar BrokerはPulsarのさまざまなエンティティを設定および管理するためのREST APIを公開しています。Pulsar admin CLIツールはadminコマンドを実行するために、Javaクライアントを使ってこれらのREST APIをコールします。利用可能なREST APIのリストについての詳細はswaggerドキュメントから学ぶことができます。
Pulsar Brokerが受け取ったリクエストに対して認証認可を行うことが可能であるPulsarのセキュリティ機能については他の章で述べました。adminツールはコマンドのリストを実行するためにPulsar BrokerのREST APIを呼びます。しかし、Pulsar Brokerのセキュリティが有効になっている場合、認証されたリクエストを得るためにBrokerのREST APIを呼ぶ際にadminツールは追加の情報を渡す必要があります。そこでこの情報がconf/client.conf
ファイルにおいて正しく設定されていることを確認してください。
これでPulsar admin CLIツールを使う準備が整いました。adminツールの利用を開始するため、下記のコマンドを実行してください。
$ bin/pulsar-admin --help
前章でPulsarのエンティティを管理するための他の代替手段 - REST APIとJava API - について述べました。このドキュメントではPulsarのエンティティを管理するためのREST APIのエンドポイントとJava APIのスニペットについても説明します。
Pulsar admin REST APIの定義についてはswaggerドキュメントから学ぶことができます。このドキュメントでは各APIの利用方法とadmin CLIコマンドがどのようにREST APIに対応するかについて説明します。
Java APIはorg.apache.pulsar.client.admin.PulsarAdmin
によりアクセスできます。
下記のコードスニペットはPulsarAdminの初期化の方法を示しています。後のドキュメントでPulsarのエンティティを管理するための使用方法について説明します。
URL url = new URL("http://localhost:8080"); String authPluginClassName = "com.org.MyAuthPluginClass"; //Pass auth-plugin class fully-qualified name if Pulsar-security enabled String authParams = "param1=value1";//Pass auth-param if auth-plugin class requires it boolean useTls = false; boolean tlsAllowInsecureConnection = false; String tlsTrustCertsFilePath = null; ClientConfiguration config = new ClientConfiguration(); config.setAuthentication(authPluginClassName, authParams); config.setUseTls(useTls); config.setTlsAllowInsecureConnection(tlsAllowInsecureConnection); config.setTlsTrustCertsFilePath(tlsTrustCertsFilePath); PulsarAdmin admin = new PulsarAdmin(url, config);
主にadminコマンドラインツールを使って下記のBrokerのエンティティにアクセスします。もしまだ下記のエンティティに対する理解が不十分なのであれば、Pulsar入門で詳細を読むことができます。
アクティブなBrokerのリストおよび指定されたBrokerが所有するネームスペースのリストを取得できます。
トラフィックを処理している利用可能でアクティブなBrokerを取得します。
$ pulsar-admin brokers list use
broker1.use.org.com:8080
GET /admin/brokers/{cluster}
admin.brokers().getActiveBrokers(clusterName)
指定したBrokerが所有し処理しているすべてのネームスペースを取得します。
$ pulsar-admin brokers namespaces --url broker1.use.org.com:8080 use
{ "my-property/use/my-ns/0x00000000_0xffffffff": { "broker_assignment": "shared", "is_controlled": false, "is_active": true } }
GET /admin/brokers/{cluster}/{broker}/ownedNamespaces
admin.brokers().getOwnedNamespaces(cluster,brokerUrl)
プロパティはアプリケーションのドメインを表します。例えばfinance, mail, sportsなどがプロパティの例です。ツールを使ってPulsarのプロパティを管理するためのCRUD操作を行うことができます。
Pulsarシステムに存在しているすべてのプロパティのリストを表示します。
$ pulsar-admin properties list
my-property
GET /admin/properties
admin.properties().getProperties()
Pulsarシステムに新しいプロパティを作成します。プロパティに対し、プロパティの管理権限を持つadminロール(コンマ区切り)とプロパティが利用可能なクラスタ(コンマ区切り)を設定できます。
pulsar-admin properties create my-property --admin-roles admin1,admin2 --allowed-clusters cl1,cl2
N/A
PUT /admin/properties/{property}
admin.properties().createProperty(property, propertyAdmin)
指定した既存プロパティの設定を取得します。
$pulsar-admin properties get my-property
{ "adminRoles": [ "admin1", "admin2" ], "allowedClusters": [ "cl1", "cl2" ] }
GET /admin/properties/{property}
admin.properties().getPropertyAdmin(property)
既に作成されたプロパティの設定を更新します。指定した既存プロパティのadminロールとクラスタの情報を更新できます。
$ pulsar-admin properties update my-property --admin-roles admin-update-1,admin-update-2 --allowed-clusters cl1,cl2
N/A
POST /admin/properties/{property}
admin.properties().updateProperty(property, propertyAdmin)
既存プロパティをPulsarシステムから削除します。
$ pulsar-admin properties delete my-property
N/A
DELETE /admin/properties/{property}
admin.properties().deleteProperty(property)
クラスタは一つ以上の地理的位置においてプロパティとそのネームスペースを利用可能にします。クラスタは一般的にはuse, uswなどの地域のコロケーション名に対応します。ツールを使ってPulsarのクラスタを管理するためのCRUD操作を行うことができます。
Pulsarに新しいクラスタを作成します。このようなシステムレベルの操作はスーパーユーザ権限でのみ実行できます。
$ pulsar-admin clusters create --url http://my-cluster.org.com:8080/ --broker-url pulsar://my-cluster.org.com:6650/ cl1
N/A
PUT /admin/clusters/{cluster}
admin.clusters().createCluster(cluster, new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls))
指定した既存クラスタの設定を取得します。
$ pulsar-admin clusters get cl1
{ "serviceUrl": "http://my-cluster.org.com:8080/", "serviceUrlTls": null, "brokerServiceUrl": "pulsar://my-cluster.org.com:6650/", "brokerServiceUrlTls": null }
GET /admin/clusters/{cluster}
admin.clusters().getCluster(cluster)
指定した既存クラスタの設定を更新します。
$ pulsar-admin clusters update --url http://my-cluster.org.com:4081/ --broker-url pulsar://my-cluster.org.com:3350/ cl1
N/A
POST /admin/clusters/{cluster}
admin.clusters().updateCluster(cluster, new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls))
Pulsarシステムから既存クラスタを削除します。
$ pulsar-admin clusters delete cl1
N/A
DELETE /admin/clusters/{cluster}
admin.clusters().deleteCluster(cluster)
Pulsarシステム内に作成されたすべてのクラスタのリストを取得します。
$ pulsar-admin clusters list
cl1
GET /admin/clusters
admin.clusters().getClusters()
ネームスペースはプロパティ内の論理的な区切りの名称です。一つのプロパティはそのプロパティの下で異なるアプリケーションを管理するために複数のネームスペースを持つことができます。
指定した既存クラスタ内のプロパティにネームスペースを作成します。
$ pulsar-admin namespaces create test-property/cl1/ns1
N/A
PUT /admin/namespaces/{property}/{cluster}/{namespace}
admin.namespaces().createNamespace(namespace)
作成済みネームスペースのポリシー情報を取得します。
$pulsar-admin namespaces policies test-property/cl1/ns1
{ "auth_policies": { "namespace_auth": {}, "destination_auth": {} }, "replication_clusters": [], "bundles_activated": true, "bundles": { "boundaries": [ "0x00000000", "0xffffffff" ], "numBundles": 1 }, "backlog_quota_map": {}, "persistence": null, "latency_stats_sample_rate": {}, "message_ttl_in_seconds": 0, "retention_policies": null, "deleted": false }
GET /admin/namespaces/{property}/{cluster}/{namespace}
admin.namespaces().getPolicies(namespace)
指定したプロパティ内にあるすべての作成済みネームスペースのリストを取得します。
$ pulsar-admin namespaces list test-property
test-property/cl1/ns1
GET /admin/namespaces/{property}
admin.namespaces().getNamespaces(property)
指定したクラスタのプロパティ内にあるすべての作成済みネームスペースのリストを取得します。
$ pulsar-admin namespaces list-cluster test-property/cl1
test-property/cl1/ns1
GET /admin/namespaces/{property}/{cluster}
admin.namespaces().getNamespaces(property, cluster)
既存のネームスペースを削除します。
$ pulsar-admin namespaces delete test-property/cl1/ns1
N/A
DELETE /admin/namespaces/{property}/{cluster}/{namespace}
admin.namespaces().deleteNamespace(namespace)
特定のクライアントロールに対して、produceやconsumeのような必要な操作のリストを許可します。
$ pulsar-admin namespaces grant-permission --actions produce,consume --role admin10 test-property/cl1/ns1
N/A
POST /admin/namespaces/{property}/{cluster}/{namespace}/permissions/{role}
admin.namespaces().grantPermissionOnNamespace(namespace, role, getAuthActions(actions))
指定したネームスペースに対して作成されたパーミッションルールを表示します。
$ pulsar-admin namespaces permissions test-property/cl1/ns1
{ "admin10": [ "produce", "consume" ] }
GET /admin/namespaces/{property}/{cluster}/{namespace}/permissions
admin.namespaces().getPermissions(namespace)
特定のクライアントロールのパーミッションを剥奪し、指定したネームスペースにアクセスできないようにします。
$ pulsar-admin namespaces revoke-permission --role admin10 test-property/cl1/ns1
N/A
DELETE /admin/namespaces/{property}/{cluster}/{namespace}/permissions/{role}
admin.namespaces().revokePermissionsOnNamespace(namespace, role)
ネームスペースにレプリケーションクラスタを設定し、Pulsarが内部的に発行されたメッセージを一つのコロケーションから別のコロケーションにレプリケートできるようにします。しかし、レプリケーションクラスタをセットするためにはネームスペースは*test-property/global/ns1.のようにグローバルである必要があります。つまりクラスタ名は“global”*でなければなりません。
$ pulsar-admin namespaces set-clusters --clusters cl2 test-property/cl1/ns1
N/A
POST /admin/namespaces/{property}/{cluster}/{namespace}/replication
admin.namespaces().setNamespaceReplicationClusters(namespace, clusters)
指定したネームスペースのレプリケーションクラスタのリストを取得します。
$ pulsar-admin namespaces get-clusters test-property/cl1/ns1
cl2
GET /admin/namespaces/{property}/{cluster}/{namespace}/replication
admin.namespaces().getNamespaceReplicationClusters(namespace)
バックログクォータはBrokerが特定の閾値に達したとき、ネームスペースの帯域幅/ストレージを制限するのに役立ちます。管理者はこの制限と制限に達したときに行う下記のアクションの内一つを設定できます。
producer_request_hold: Brokerはproduceリクエストのペイロードをホールドし、永続化しないようになります
producer_exception: Brokerは例外を発生させてクライアントとの接続を切断します
consumer_backlog_eviction: Brokerはバックログメッセージの破棄を開始します
バックログクォータ制限はバックログクォータタイプ: destination_storageを定義することによって考慮されるようになります。
$ pulsar-admin namespaces set-backlog-quota --limit 10 --policy producer_request_hold test-property/cl1/ns1
N/A
POST /admin/namespaces/{property}/{cluster}/{namespace}/backlogQuota
admin.namespaces().setBacklogQuota(namespace, new BacklogQuota(limit, policy))
指定したネームスペースのバックログクォータ設定を表示します。
$ pulsar-admin namespaces get-backlog-quotas test-property/cl1/ns1
{ "destination_storage": { "limit": 10, "policy": "producer_request_hold" } }
GET /admin/namespaces/{property}/{cluster}/{namespace}/backlogQuotaMap
admin.namespaces().getBacklogQuotaMap(namespace)
指定したネームスペースのバックログクォータポリシーを削除します。
$ pulsar-admin namespaces remove-backlog-quota test-property/cl1/ns1
N/A
DELETE /admin/namespaces/{property}/{cluster}/{namespace}/backlogQuota
admin.namespaces().removeBacklogQuota(namespace, backlogQuotaType)
永続性ポリシーは指定したネームスペースにあるすべてのトピックの永続性レベルを設定できます。
Bookkeeper-ack-quorum: 各エントリに対して書き込み成功のAckを待機するBookieの数(保証されるコピーの数)、デフォルト: 0
Bookkeeper-ensemble: 一つのトピックに対して使用されるBookieの数、デフォルト: 0
Bookkeeper-write-quorum: 各エントリに対して書き込みを行うBookieの数、デフォルト: 0
Ml-mark-delete-max-rate: mark-delete操作のスロットル率 (0は無制限)、デフォルト: 0.0
$ pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 2 --bookkeeper-ensemble 3 --bookkeeper-write-quorum 2 --ml-mark-delete-max-rate 0 test-property/cl1/ns1
N/A
POST /admin/persistent/{property}/{cluster}/{namespace}/persistence
admin.namespaces().setPersistence(namespace,new PersistencePolicies(bookkeeperEnsemble, bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate))
指定したネームスペースの永続性ポリシーの設定を表示します。
$ pulsar-admin namespaces get-persistence test-property/cl1/ns1
{ "bookkeeperEnsemble": 3, "bookkeeperWriteQuorum": 2, "bookkeeperAckQuorum": 2, "managedLedgerMaxMarkDeleteRate": 0 }
GET /admin/namespaces/{property}/{cluster}/{namespace}/persistence
admin.namespaces().getPersistence(namespace)
ネームスペースBundleは同じネームスペースに属するトピックの仮想的なグループです。多数のBundleによりBrokerの負荷が高まった場合、このコマンドを使って処理が重いBundleをそのBrokerから取り外し、より負荷が小さい他のBrokerに扱わせることができます。ネームスペースBundleは0x00000000と0xffffffffのように開始と終了のレンジによって定義されます。
$ pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff test-property/pstg-gq1/ns1
N/A
PUT /admin/namespaces/{property}/{cluster}/{namespace}/unload
admin.namespaces().unloadNamespaceBundle(namespace, bundle)
メッセージの生存時間(秒)を設定します。
$ pulsar-admin namespaces set-message-ttl --messageTTL 100 test-property/cl1/ns1
N/A
POST /admin/namespaces/{property}/{cluster}/{namespace}/messageTTL
admin.namespaces().setNamespaceMessageTTL(namespace, messageTTL)
ネームスペースに対して設定されたメッセージTTLを取得します。
$ pulsar-admin namespaces get-message-ttl test-property/cl1/ns1
100
GET /admin/namespaces/{property}/{cluster}/{namespace}/messageTTL
admin.namespaces().getNamespaceReplicationClusters(namespace)
各ネームスペースのBundleは複数のトピックを含み、各Bundleはただ一つのBrokerによって扱われます。Bundleがそれに含まれる複数のトピックの処理で重くなった場合、Brokerに対し負荷を発生させます。これを解決するため、管理者はこのコマンドを用いてBundleを分割できます。
$ pulsar-admin namespaces split-bundle --bundle 0x00000000_0xffffffff test-property/cl1/ns1
N/A
PUT /admin/namespaces/{property}/{cluster}/{namespace}/{bundle}/split
admin.namespaces().splitNamespaceBundle(namespace, bundle)
指定したネームスペースに属するすべてのトピックのすべてのメッセージバックログを削除します。特定のサブスクリプションのバックログのみを削除することも可能です。
$ pulsar-admin namespaces clear-backlog --sub my-subscription test-property/pstg-gq1/ns1
N/A
POST /admin/namespaces/{property}/{cluster}/{namespace}/clearBacklog
admin.namespaces().clearNamespaceBacklogForSubscription(namespace, subscription)
特定のネームスペースBundleに属するすべてのトピックのすべてのメッセージバックログを削除します。特定のサブスクリプションのバックログのみを削除することも可能です。
$ pulsar-admin namespaces clear-backlog --bundle 0x00000000_0xffffffff --sub my-subscription test-property/pstg-gq1/ns1
N/A
POST /admin/namespaces/{property}/{cluster}/{namespace}/{bundle}/clearBacklog
admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, bundle, subscription)
各ネームスペースは複数のトピックを含み、各トピックのリテンションサイズ(ストレージサイズ)は特定の閾値を超えるべきではなく、特定の期間まで保持されるべきです。このコマンドを使って、指定したネームスペース内のトピックのリテンションサイズと時間を設定できます。
$ pulsar-admin set-retention --size 10 --time 100 test-property/cl1/ns1
N/A
POST /admin/namespaces/{property}/{cluster}/{namespace}/retention
admin.namespaces().setRetention(namespace, new RetentionPolicies(retentionTimeInMin, retentionSizeInMB))
指定したネームスペースのリテンション情報を表示します。
$ pulsar-admin namespaces get-retention test-property/cl1/ns1
{ "retentionTimeInMinutes": 10, "retentionSizeInMB": 100 }
GET /admin/namespaces/{property}/{cluster}/{namespace}/retention
admin.namespaces().getRetention(namespace)
persistentコマンドは、メッセージをproduce/consumeするための論理的なエンドポイントであるトピックにアクセスする際に役立ちます。
Producerはトピックにメッセージをproduceし、Consumerはトピックにproduceされたメッセージをconsumeするためにトピックを購読します。
以降に説明とコマンドを記載します - パーシステントトピックのフォーマットは次の通りです:
persistent://<property_name> <cluster_name> <namespace_name> <topic-name>
指定されたネームスペース下にパーティションドトピックを作成します。作成のためには、パーティション数は1より大きくなくてはいけません。
$ pulsar-admin persistent create-partitioned-topic --partitions 2 persistent://test-property/cl1/ns1/pt1
N/A
PUT /admin/persistent/{property}/{cluster}/{namespace}/{destination}/partitions
admin.persistentTopics().createPartitionedTopic(persistentTopic, numPartitions)
作成されたパーティションドトピックのメタデータを提供します。
$ pulsar-admin persistent get-partitioned-topic-metadata persistent://test-property/cl1/ns1/pt1
{ "partitions": 2 }
GET /admin/persistent/{property}/{cluster}/{namespace}/{destination}/partitions
admin.persistentTopics().getPartitionedTopicMetadata(persistentTopic)
作成されたパーティションドトピックを削除します。
$ pulsar-admin persistent delete-partitioned-topic persistent://test-property/cl1/ns1/pt1
N/A
DELETE /admin/persistent/{property}/{cluster}/{namespace}/{destination}/partitions
admin.persistentTopics().deletePartitionedTopic(persistentTopic)
トピックを削除します。ただしアクティブなサブスクリプションまたはProducerの接続がある場合には、トピックを削除できません。
pulsar-admin persistent delete persistent://test-property/cl1/ns1/my-topic
N/A
DELETE /admin/persistent/{property}/{cluster}/{namespace}/{destination}
admin.persistentTopics().delete(persistentTopic)
指定されたネームスペース下に存在するパーシステントトピックのリストを提供します。
$ pulsar-admin persistent list test-property/cl1/ns1
my-topic
GET /admin/persistent/{property}/{cluster}/{namespace}
admin.persistentTopics().getList(namespace)
指定されたトピックに対して特定のアクションを実行するためのパーミッションをクライアントロールに付与します。
$ pulsar-admin persistent grant-permission --actions produce,consume --role application1 persistent://test-property/cl1/ns1/tp1
N/A
POST /admin/persistent/{property}/{cluster}/{namespace}/{destination}/permissions/{role}
admin.persistentTopics().grantPermission(destination, role, getAuthActions(actions))
指定されたトピックに対してのクライアントロールのパーミッションのリストを表示します。
$ pulsar-admin permissions persistent://test-property/cl1/ns1/tp1
{ "application1": [ "consume", "produce" ] }
GET /admin/persistent/{property}/{cluster}/{namespace}/{destination}/permissions
admin.persistentTopics().getPermissions(destination)
クライアントロールに対して付与されたパーミッションを剥奪します。
$ pulsar-admin persistent revoke-permission --role application1 persistent://test-property/cl1/ns1/tp1
N/A
DELETE /admin/persistent/{property}/{cluster}/{namespace}/{destination}/permissions/{role}
admin.persistentTopics().revokePermissions(destination, role)
パーティションドトピックの現在の統計情報を表示します。
$ pulsar-admin persistent partitioned-stats --per-partition persistent://test-property/cl1/ns1/tp1
{ "msgRateIn": 4641.528542257553, "msgThroughputIn": 44663039.74947473, "msgRateOut": 0, "msgThroughputOut": 0, "averageMsgSize": 1232439.816728665, "storageSize": 135532389160, "publishers": [ { "msgRateIn": 57.855383881403576, "msgThroughputIn": 558994.7078932219, "averageMsgSize": 613135, "producerId": 0, "producerName": null, "address": null, "connectedSince": null } ], "subscriptions": { "my-topic_subscription": { "msgRateOut": 0, "msgThroughputOut": 0, "msgBacklog": 116632, "type": null, "msgRateExpired": 36.98245516804671, "consumers": [] } }, "replication": {} }
GET /admin/persistent/{property}/{cluster}/{namespace}/{destination}/partitioned-stats
admin.persistentTopics().getPartitionedStats(persistentTopic, perPartition)
パーティションドトピックではないトピックの現在の統計情報を表示します。
msgRateIn: 全てのローカルとレプリケーション用のPublisherの発行レートの合計で、1秒あたりのメッセージ数です。
msgThroughputIn: 上記と同様ですが、1秒あたりのバイト数です。
msgRateOut: 全てのローカルとレプリケーション用のConsumerへの配送レートの合計で、1秒あたりのメッセージ数です。
msgThroughputOut: 上記と同様ですが、1秒あたりのバイト数です。
averageMsgSize: 直近のインターバル内で発行されたメッセージの平均バイトサイズです。
storageSize: このトピックのLedgerのストレージサイズの合計です。
publishers: トピック内の全てのローカルPublisherの一覧です。0または何千もの可能性があります。
averageMsgSize: 直近のインターバル内でこのPublisherからのメッセージの平均バイトサイズです。
producerId: このトピック上での、このProducerの内部的な識別子です。
producerName: クライアントライブラリによって生成されたこのProducerの内部的な識別子です。
address: このProducerの接続用のIPアドレスと送信元ポートです。
connectedSince: このProducerが作成または最後に再接続したタイムスタンプです。
subscriptions: トピックに対してのローカルの全サブスクリプションリストです。
my-subscription: このサブスクリプションの名前です (クライアントが定義します) 。
msgBacklog: このサブスクリプションのバックログ内のメッセージ数です。
type: このサブスクリプションのタイプです。
msgRateExpired: TTLのためにこのサブスクリプションから配送されずに破棄されたメッセージのレートです。
consumers: このサブスクリプションに接続しているConsumerリストです。
consumerName: クライアントライブラリによって生成されたこのConsumerの内部的な識別子です。
availablePermits: このConsumerがクライアントライブラリのlistenキューに格納できるメッセージ数です。0はクライアントライブラリのキューがいっぱいであり、receive()はコールされないことを意味します。0でない場合には、このConsumerはメッセージを配送される準備ができています。
replication: このセクションは、トピックのクラスタ間でのレプリケーションの統計情報を示します。
replicationBacklog: レプリケーション先のバックログに送信されるメッセージです。
connected: 送信レプリケータが接続されているかどうかです。
replicationDelayInSeconds: connectedがtrueの場合で、最も古いメッセージが送信されるのを待っている時間です。
inboundConnection: このBrokerに対しての、リモートクラスタのPublisher接続におけるそのBrokerのIPとポートです。
inboundConnectedSince: リモートクラスタにメッセージを発行するためにTCP接続が使われます。もし接続しているローカルのPublisherがいない場合には、この接続は数分後に自動的に閉じられます。
$ pulsar-admin persistent stats persistent://test-property/cl1/ns1/tp1
{ "msgRateIn": 0, "msgThroughputIn": 0, "msgRateOut": 0, "msgThroughputOut": 0, "averageMsgSize": 0, "storageSize": 11814, "publishers": [ { "msgRateIn": 0, "msgThroughputIn": 0, "averageMsgSize": 0, "producerId": 0, "producerName": "gq1-54-4001", "address": "/10.215.138.238:44458", "connectedSince": "2016-06-16 22:56:56.509" } ], "subscriptions": { "my-subscription": { "msgRateOut": 0, "msgThroughputOut": 0, "msgBacklog": 17, "type": "Shared", "msgRateExpired": 2.1771406267194497, "consumers": [ { "msgRateOut": 0, "msgThroughputOut": 0, "consumerName": "a67f7", "availablePermits": 1186, "address": "/10.215.166.121:35095", "connectedSince": "2016-06-25 00:05:58.312" } ] } }, "replication": {} }
GET /admin/persistent/{property}/{cluster}/{namespace}/{destination}/stats
admin.persistentTopics().getStats(persistentTopic)
トピックの詳細な統計情報を表示します。
entriesAddedCounter: このBrokerがこのトピックを読み込んでから発行されたメッセージ数です。
numberOfEntries: 書き込まれたメッセージの総数です。
totalSize: 全メッセージのバイト単位での合計ストレージサイズです。
currentLedgerEntries: 現在openしているLedgerに書き込まれたメッセージ数です。
currentLedgerSize: 現在openしているLedgerに書き込まれたメッセージのバイトサイズです。
lastLedgerCreatedTimestamp: 最後のLedgerが作成された時刻です。
lastLedgerCreationFailureTimestamp: 最後のLedgerに障害が発生した時刻です。
waitingCursorsCount:“キャッチアップ状態”で新しいメッセージが発行されるのを待っているカーソル数です。
pendingAddEntriesCount: 完了を待っている (非同期) 書き込みリクエストのメッセージ数です。
lastConfirmedEntry: 書き込みに成功した最後のメッセージのledgerid:entryid。entryidが−1の場合、Ledgerがすでにオープンされているか現在オープンされていますが、まだ書き込まれたエントリがないことを意味します。
state: このLedgerの書き込みのための状態です。LedgerOpenedは、発行されたメッセージを保存するためのLedgerをオープンしていることを意味します。
ledgers: このトピックのメッセージを保持している全てのLedgerの順序付きリストです。
cursors: このトピック上の全てのカーソルのリストです。トピックの統計情報上に表示されたサブスクリプションごとに1つ表示されます。
markDeletePosition: Ackのポジション:SubscriberからAckが返された最後のメッセージです。
readPosition: メッセージを読むためのSubscriberの最新のポジションです。
waitingReadOp: サブスクリプションが最新のメッセージを読み込み、新しいメッセージが発行されるのを待っている時に、これはtrueになります。
pendingReadOps: 進行中のBookKeeperへの未解決の読み取りリクエスト数です。
messagesConsumedCounter: このBrokerがこのトピックを読み込んでからこのカーソルがAckしたメッセージ数です。
cursorLedger: 永続的に現在のmarkDeletePositionを保存するために利用されているLedgerです。
cursorLedgerLastEntry: 永続的に現在のmarkDeletePositionを保存するために使われた最後のentryidです。
individuallyDeletedMessages: もしAckが順不同で行われている場合に、markDeletePositionと読み込みポジションの間でAckされたメッセージの範囲を表示します。
lastLedgerSwitchTimestamp: カーソルLedgerがロールオーバされた最後の時刻です。
state: カーソルLedgerの状態: Openは、markDeletePositionのアップデートを保存するためのカーソルLedgerが存在することを意味します。
$ pulsar-admin persistent stats-internal persistent://test-property/cl1/ns1/tp1
{ "entriesAddedCounter": 20449518, "numberOfEntries": 3233, "totalSize": 331482, "currentLedgerEntries": 3233, "currentLedgerSize": 331482, "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825", "lastLedgerCreationFailureTimestamp": null, "waitingCursorsCount": 1, "pendingAddEntriesCount": 0, "lastConfirmedEntry": "324711539:3232", "state": "LedgerOpened", "ledgers": [ { "ledgerId": 324711539, "entries": 0, "size": 0 } ], "cursors": { "my-subscription": { "markDeletePosition": "324711539:3133", "readPosition": "324711539:3233", "waitingReadOp": true, "pendingReadOps": 0, "messagesConsumedCounter": 20449501, "cursorLedger": 324702104, "cursorLedgerLastEntry": 21, "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]", "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313", "state": "Open" } } }
GET /admin/persistent/{property}/{cluster}/{namespace}/{destination}/internalStats
admin.persistentTopics().getInternalStats(persistentTopic)
指定されたトピックの特定のサブスクリプションのNつのメッセージを覗き見ます。
$ pulsar-admin persistent peek-messages --count 10 --subscription my-subscription persistent://test-property/cl1/ns1/my-topic
Message ID: 315674752:0 Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" } msg-payload
GET /admin/persistent/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/position/{messagePosition}
admin.persistentTopics().peekMessages(persistentTopic, subName, numMessages)
指定されたトピックの指定されたサブスクリプションのNつのメッセージをスキップします。
$ pulsar-admin persistent skip --count 10 --subscription my-subscription persistent://test-property/cl1/ns1/my-topic
N/A
POST /admin/persistent/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/skip/{numMessages}
admin.persistentTopics().skipMessages(persistentTopic, subName, numMessages)
指定されたトピックの特定のサブスクリプションの全ての古いメッセージをスキップします。
$ pulsar-admin persistent skip-all --subscription my-subscription persistent://test-property/cl1/ns1/my-topic
N/A
POST /admin/persistent/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/skip_all
admin.persistentTopics().skipAllMessages(persistentTopic, subName)
指定された有効期限 (秒単位) よりも古い、指定されたトピック上の特定のサブスクリプションのメッセージを有効期限切れにします。
$ pulsar-admin persistent expire-messages --subscription my-subscription --expireTime 120 persistent://test-property/cl1/ns1/my-topic
N/A
POST /admin/persistent/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/expireMessages/{expireTimeInSeconds}
admin.persistentTopics().expireMessages(persistentTopic, subName, expireTimeInSeconds)
指定された有効期限 (秒単位) よりも古い、トピック上の全てのサブスクリプションのメッセージを有効期限切れにします。
$ pulsar-admin persistent expire-messages-all-subscriptions --expireTime 120 persistent://test-property/cl1/ns1/my-topic
N/A
POST /admin/persistent/{property}/{cluster}/{namespace}/{destination}/all_subscription/expireMessages/{expireTimeInSeconds}
admin.persistentTopics().expireMessagesForAllSubscriptions(persistentTopic, expireTimeInSeconds)
サブスクリプションのカーソル位置をX分前に記録された位置まで戻します。 これは、X分前の時間とカーソル位置を計算し、その位置にリセットします。
$ pulsar-admin persistent reset-cursor --subscription my-subscription --time 10 persistent://test-property/pstg-gq1/ns1/my-topic
N/A
POST /admin/persistent/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor/{timestamp}
admin.persistentTopics().resetCursor(persistentTopic, subName, timestamp)
指定されたトピックに対応しているBrokerのurlを探します。
$ pulsar-admin persistent lookup persistent://test-property/pstg-gq1/ns1/my-topic
"pulsar://broker1.org.com:4480"
GET http://<broker-url>:<port>/lookup/v2/destination/persistent/{property}/{cluster}/{namespace}/{dest} (\* this api serves by “lookup” resource and not “persistent”)
admin.lookups().lookupDestination(destination)
指定されたトピックの全てのサブスクリプション名を表示します。
$ pulsar-admin persistent subscriptions persistent://test-property/pstg-gq1/ns1/my-topic
my-subscription
GET /admin/persistent/{property}/{cluster}/{namespace}/{destination}/subscriptions
admin.persistentTopics().getSubscriptions(persistentTopic)
これ以上メッセージを処理しないサブスクリプションを購読解除する際にも役立ちます。
$pulsar-admin persistent unsubscribe --subscription my-subscription persistent://test-property/pstg-gq1/ns1/my-topic
N/A
DELETE /admin/persistent/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}
admin.persistentTopics().deleteSubscription(persistentTopic, subName)
ネームスペースの隔離ポリシーを作成します。
auto-failover-policy-params: 自動フェイルオーバーポリシーのパラメータで、カンマ区切りでname=value形式で指定します。
auto-failover-policy-type: 自動フェイルオーバーポリシーのタイプ名です。
namespaces: カンマ区切りのネームスペースの正規表現リストです。
primary: カンマ区切りのプライマリBrokerの正規表現リストです。
secondary: カンマ区切りのセカンダリBrokerの正規表現リストです。
$ pulsar-admin ns-isolation-policy --auto-failover-policy-params min_limit=0 --auto-failover-policy-type min_available --namespaces test-property/cl1/ns.*|test-property/cl1/test-ns*.* --secondary broker2.* --primary broker1.* cl1 ns-is-policy1
N/A
POST /admin/clusters/{cluster}/namespaceIsolationPolicies/{policyName}
admin.clusters().createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData);
ネームスペースの隔離ポリシーを表示します。
$ pulsar-admin ns-isolation-policy get cl1 ns-is-policy1
{ "namespaces": [ "test-property/cl1/ns.*|test-property/cl1/test-ns*.*" ], "primary": [ "broker1.*" ], "secondary": [ "broker2.*" ], "auto_failover_policy": { "policy_type": "min_available", "parameters": { "min_limit": "0" } } }
GET /admin/clusters/{cluster}/namespaceIsolationPolicies/{policyName}
admin.clusters().getNamespaceIsolationPolicy(clusterName, policyName)
ネームスペースの隔離ポリシーを削除します。
$ pulsar-admin ns-isolation-policy delete ns-is-policy1
N/A
DELETE /admin/clusters/{cluster}/namespaceIsolationPolicies/{policyName}
admin.clusters().deleteNamespaceIsolationPolicy(clusterName, policyName)
指定されたクラスタによって提供されているネームスペースの隔離ポリシーの全リストを表示します。
$ pulsar-admin ns-isolation-policy list cl1
{ "ns-is-policy1": { "namespaces": [ "test-property/cl1/ns.*|test-property/cl1/test-ns*.*" ], "primary": [ "broker1.*" ], "secondary": [ "broker2.*" ], "auto_failover_policy": { "policy_type": "min_available", "parameters": { "min_limit": "0" } } } }
GET /admin/clusters/{cluster}/namespaceIsolationPolicies
admin.clusters().getNamespaceIsolationPolicies(clusterName)
指定されたネームスペースBundleに対して独自の割り当て情報をセットします。
$ pulsar-admin resource-quotas set --bandwidthIn 10 --bandwidthOut 10 --bundle 0x00000000_0xffffffff --memory 10 --msgRateIn 10 --msgRateOut 10 --namespace test-property/cl1/ns1
N/A
POST /admin/resource-quotas/{property}/{cluster}/{namespace}/{bundle}
admin.resourceQuotas().setNamespaceBundleResourceQuota(namespace, bundle, quota)
リソース割り当ての情報を表示します。
$ pulsar-admin resource-quotas get --bundle 0x00000000_0xffffffff --namespace test-property/cl1/my-topic
{ "msgRateIn": 80.40352101165782, "msgRateOut": 132.58187392933146, "bandwidthIn": 144273.8819600397, "bandwidthOut": 234497.9190227951, "memory": 199.91739142481595, "dynamic": true }
GET /admin/resource-quotas/{property}/{cluster}/{namespace}/{bundle}
admin.resourceQuotas().getNamespaceBundleResourceQuota(namespace, bundle)
独自のリソース割り当てをデフォルトの設定に戻します。
$ pulsar-admin resource-quotas reset-namespace-bundle-quota --bundle 0x00000000_0xffffffff --namespace test-property/cl1/my-topic
N/A
DELETE /admin/resource-quotas/{property}/{cluster}/{namespace}/{bundle}
admin.resourceQuotas().resetNamespaceBundleResourceQuota(namespace, bundle)
Pulsarは任意のトピック上でメッセージのproduceとconsumeを行うためのJavaのAPIを提供しています。 しかし、Pulsarではトピック上でのメッセージのproduceとconsumeに役立つCLIユーティリティも提供しています。
ターミナル上で次のディレクトリに移動して、クライアントツールを試してみてください。
$ $PULSAR_HOME/bin
$ ./pulsar-client --help