You can use Pulsar's admin API to create and manage non-partitioned topics.
In all of the instructions and commands below, the topic name structure is:
persistent://tenant/namespace/topic
Non-partitioned topics in Pulsar must be explicitly created. When creating a new non-partitioned topic you need to provide a name for the topic.
Note
By default, after 60 seconds of creation, topics are considered inactive and deleted automatically to prevent from generating trash data.
To disable this feature, set
brokerDeleteInactiveTopicsEnabled
tofalse
.To change the frequency of checking inactive topics, set
brokerDeleteInactiveTopicsFrequencySeconds
to your desired value.For more information about these two parameters, see here.
You can create non-partitioned topics using the create
command and specifying the topic name as an argument. Here's an example:
$ bin/pulsar-admin topics create \ persistent://my-tenant/my-namespace/my-topic
Note
It‘s only allowed to create non partitioned topic of name contains suffix ‘-partition-’ followed by numeric value like ‘xyz-topic-partition-10’, if there’s already a partitioned topic with same name, in this case ‘xyz-topic’, and has number of partition larger then that numeric value in this case 11(partition index is start from 0). Else creation of such topic will fail.
{@inject: endpoint|PUT|/admin/v2/:schema/:tenant/:namespace/:topic|operation/createNonPartitionedTopic?version=[[pulsar:version_number]]}
String topicName = "persistent://my-tenant/my-namespace/my-topic"; admin.topics().createNonPartitionedTopic(topicName);
Non-partitioned topics can be deleted using the delete
command, specifying the topic by name:
$ bin/pulsar-admin topics delete \ persistent://my-tenant/my-namespace/my-topic
{@inject: endpoint|DELETE|/admin/v2/:schema/:tenant/:namespace/:topic|operation/deleteTopic?version=[[pulsar:version_number]]}
admin.topics().delete(persistentTopic);
It provides a list of topics existing under a given namespace.
$ pulsar-admin topics list tenant/namespace persistent://tenant/namespace/topic1 persistent://tenant/namespace/topic2
{@inject: endpoint|GET|/admin/v2/:schema/:tenant/:namespace|operation/getList?version=[[pulsar:version_number]]}
admin.topics().getList(namespace);