blob: 2a5ab68b1e5d6ad50fa5f61f96ba379f35e169ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.admin.cli;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.beust.jcommander.IUsageFormatter;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.CommaParameterSplitter;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@Parameters(commandDescription = "Operations on persistent topics")
public class CmdTopics extends CmdBase {
private Topics topics;
public CmdTopics(Supplier<PulsarAdmin> admin) {
super("topics", admin);
jcommander.addCommand("list", new ListCmd());
jcommander.addCommand("list-partitioned-topics", new PartitionedTopicListCmd());
jcommander.addCommand("permissions", new Permissions());
jcommander.addCommand("grant-permission", new GrantPermissions());
jcommander.addCommand("revoke-permission", new RevokePermissions());
jcommander.addCommand("lookup", new Lookup());
jcommander.addCommand("partitioned-lookup", new PartitionedLookup());
jcommander.addCommand("bundle-range", new GetBundleRange());
jcommander.addCommand("delete", new DeleteCmd());
jcommander.addCommand("unload", new UnloadCmd());
jcommander.addCommand("subscriptions", new ListSubscriptions());
jcommander.addCommand("unsubscribe", new DeleteSubscription());
jcommander.addCommand("create-subscription", new CreateSubscription());
jcommander.addCommand("stats", new GetStats());
jcommander.addCommand("stats-internal", new GetInternalStats());
jcommander.addCommand("info-internal", new GetInternalInfo());
jcommander.addCommand("partitioned-stats", new GetPartitionedStats());
jcommander.addCommand("partitioned-stats-internal", new GetPartitionedStatsInternal());
jcommander.addCommand("skip", new Skip());
jcommander.addCommand("clear-backlog", new ClearBacklog());
jcommander.addCommand("expire-messages", new ExpireMessages());
jcommander.addCommand("expire-messages-all-subscriptions", new ExpireMessagesForAllSubscriptions());
jcommander.addCommand("create-partitioned-topic", new CreatePartitionedCmd());
jcommander.addCommand("create-missed-partitions", new CreateMissedPartitionsCmd());
jcommander.addCommand("create", new CreateNonPartitionedCmd());
jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
jcommander.addCommand("peek-messages", new PeekMessages());
jcommander.addCommand("examine-messages", new ExamineMessages());
jcommander.addCommand("get-message-by-id", new GetMessageById());
jcommander.addCommand("reset-cursor", new ResetCursor());
jcommander.addCommand("terminate", new Terminate());
jcommander.addCommand("compact", new Compact());
jcommander.addCommand("compaction-status", new CompactionStatusCmd());
jcommander.addCommand("offload", new Offload());
jcommander.addCommand("offload-status", new OffloadStatusCmd());
jcommander.addCommand("last-message-id", new GetLastMessageId());
jcommander.addCommand("get-backlog-quotas", new GetBacklogQuotaMap());
jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
jcommander.addCommand("remove-backlog-quota", new RemoveBacklogQuota());
jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());
jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
jcommander.addCommand("remove-retention", new RemoveRetention());
//deprecated commands
jcommander.addCommand("enable-deduplication", new EnableDeduplication());
jcommander.addCommand("disable-deduplication", new DisableDeduplication());
jcommander.addCommand("get-deduplication-enabled", new GetDeduplicationStatus());
jcommander.addCommand("set-deduplication", new SetDeduplicationStatus());
jcommander.addCommand("get-deduplication", new GetDeduplicationStatus());
jcommander.addCommand("remove-deduplication", new RemoveDeduplicationStatus());
jcommander.addCommand("get-deduplication-snapshot-interval", new GetDeduplicationSnapshotInterval());
jcommander.addCommand("set-deduplication-snapshot-interval", new SetDeduplicationSnapshotInterval());
jcommander.addCommand("remove-deduplication-snapshot-interval", new RemoveDeduplicationSnapshotInterval());
jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
jcommander.addCommand("remove-delayed-delivery", new RemoveDelayedDelivery());
jcommander.addCommand("get-persistence", new GetPersistence());
jcommander.addCommand("set-persistence", new SetPersistence());
jcommander.addCommand("remove-persistence", new RemovePersistence());
jcommander.addCommand("get-offload-policies", new GetOffloadPolicies());
jcommander.addCommand("set-offload-policies", new SetOffloadPolicies());
jcommander.addCommand("remove-offload-policies", new RemoveOffloadPolicies());
jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());
jcommander.addCommand("get-replicator-dispatch-rate", new GetReplicatorDispatchRate());
jcommander.addCommand("set-replicator-dispatch-rate", new SetReplicatorDispatchRate());
jcommander.addCommand("remove-replicator-dispatch-rate", new RemoveReplicatorDispatchRate());
jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
//deprecated commands
jcommander.addCommand("get-max-unacked-messages-on-consumer", new GetMaxUnackedMessagesOnConsumer());
jcommander.addCommand("set-max-unacked-messages-on-consumer", new SetMaxUnackedMessagesOnConsumer());
jcommander.addCommand("remove-max-unacked-messages-on-consumer", new RemoveMaxUnackedMessagesOnConsumer());
jcommander.addCommand("get-max-unacked-messages-on-subscription", new GetMaxUnackedMessagesOnSubscription());
jcommander.addCommand("set-max-unacked-messages-on-subscription", new SetMaxUnackedMessagesOnSubscription());
jcommander.addCommand("remove-max-unacked-messages-on-subscription", new RemoveMaxUnackedMessagesOnSubscription());
jcommander.addCommand("get-max-unacked-messages-per-consumer", new GetMaxUnackedMessagesOnConsumer());
jcommander.addCommand("set-max-unacked-messages-per-consumer", new SetMaxUnackedMessagesOnConsumer());
jcommander.addCommand("remove-max-unacked-messages-per-consumer", new RemoveMaxUnackedMessagesOnConsumer());
jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesOnSubscription());
jcommander.addCommand("set-max-unacked-messages-per-subscription", new SetMaxUnackedMessagesOnSubscription());
jcommander.addCommand("remove-max-unacked-messages-per-subscription", new RemoveMaxUnackedMessagesOnSubscription());
jcommander.addCommand("get-publish-rate", new GetPublishRate());
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
jcommander.addCommand("set-subscription-types-enabled", new SetSubscriptionTypesEnabled());
jcommander.addCommand("get-subscription-types-enabled", new GetSubscriptionTypesEnabled());
//deprecated commands
jcommander.addCommand("get-maxProducers", new GetMaxProducers());
jcommander.addCommand("set-maxProducers", new SetMaxProducers());
jcommander.addCommand("remove-maxProducers", new RemoveMaxProducers());
jcommander.addCommand("get-max-producers", new GetMaxProducers());
jcommander.addCommand("set-max-producers", new SetMaxProducers());
jcommander.addCommand("remove-max-producers", new RemoveMaxProducers());
jcommander.addCommand("get-max-subscriptions", new GetMaxSubscriptionsPerTopic());
jcommander.addCommand("set-max-subscriptions", new SetMaxSubscriptionsPerTopic());
jcommander.addCommand("remove-max-subscriptions", new RemoveMaxSubscriptionsPerTopic());
jcommander.addCommand("get-max-message-size", new GetMaxMessageSize());
jcommander.addCommand("set-max-message-size", new SetMaxMessageSize());
jcommander.addCommand("remove-max-message-size", new RemoveMaxMessageSize());
jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
jcommander.addCommand("get-max-consumers", new GetMaxConsumers());
jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());
jcommander.addCommand("get-subscribe-rate", new GetSubscribeRate());
jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate());
jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate());
initDeprecatedCommands();
}
private void initDeprecatedCommands() {
IUsageFormatter usageFormatter = jcommander.getUsageFormatter();
if (usageFormatter instanceof CmdUsageFormatter) {
CmdUsageFormatter cmdUsageFormatter = (CmdUsageFormatter) usageFormatter;
cmdUsageFormatter.addDeprecatedCommand("enable-deduplication");
cmdUsageFormatter.addDeprecatedCommand("disable-deduplication");
cmdUsageFormatter.addDeprecatedCommand("get-deduplication-enabled");
cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-on-consumer");
cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-consumer");
cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-consumer");
cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-on-subscription");
cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-subscription");
cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-subscription");
cmdUsageFormatter.addDeprecatedCommand("get-maxProducers");
cmdUsageFormatter.addDeprecatedCommand("set-maxProducers");
cmdUsageFormatter.addDeprecatedCommand("remove-maxProducers");
}
}
private Topics getTopics() {
if (topics == null) {
topics = getAdmin().topics();
}
return topics;
}
@Parameters(commandDescription = "Get the list of topics under a namespace.")
private class ListCmd extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
print(getTopics().getList(namespace));
}
}
@Parameters(commandDescription = "Get the list of partitioned topics under a namespace.")
private class PartitionedTopicListCmd extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
print(getTopics().getPartitionedTopicList(namespace));
}
}
@Parameters(commandDescription = "Grant a new permission to a client role on a single topic.")
private class GrantPermissions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = "--role", description = "Client role to which grant permissions", required = true)
private String role;
@Parameter(names = "--actions", description = "Actions to be granted (produce,consume)", required = true, splitter = CommaParameterSplitter.class)
private List<String> actions;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getTopics().grantPermission(topic, role, getAuthActions(actions));
}
}
@Parameters(commandDescription = "Revoke permissions on a topic \n "
+ "\t\t\t Revoke permissions to a client role on a single topic. If the permission \n"
+ "\t\t\t was not set at the topic level, but rather at the namespace level, this \n"
+ "\t\t\t operation will return an error (HTTP status code 412).")
private class RevokePermissions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = "--role", description = "Client role to which revoke permissions", required = true)
private String role;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getTopics().revokePermissions(topic, role);
}
}
@Parameters(commandDescription = "Get the permissions on a topic\n"
+ "\t\t Retrieve the effective permissions for a topic. These permissions are defined \n"
+ "\t\t by the permissions set at the namespace level combined (union) with any eventual \n"
+ "\t\t specific permission set on the topic.")
private class Permissions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
print(getTopics().getPermissions(topic));
}
}
@Parameters(commandDescription = "Lookup a topic from the current serving broker")
private class Lookup extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
print(getAdmin().lookups().lookupTopic(topic));
}
}
@Parameters(commandDescription = "Lookup a partitioned topic from the current serving broker")
private class PartitionedLookup extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/partitionedTopic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
print(getAdmin().lookups().lookupPartitionedTopic(topic));
}
}
@Parameters(commandDescription = "Get Namespace bundle range of a topic")
private class GetBundleRange extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
print(getAdmin().lookups().getBundleRange(topic));
}
}
@Parameters(commandDescription = "Create a partitioned topic. \n"
+ "\t\tThe partitioned topic has to be created before creating a producer on it.")
private class CreatePartitionedCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Parameter(names = { "-p",
"--partitions" }, description = "Number of partitions for the topic", required = true)
private int numPartitions;
@Override
void run() throws Exception {
String topic = validateTopicName(params);
getTopics().createPartitionedTopic(topic, numPartitions);
}
}
@Parameters(commandDescription = "Try to create partitions for partitioned topic. \n"
+ "\t\t The partitions of partition topic has to be created, can be used by repair partitions when \n"
+ "\t\t topic auto creation is disabled")
private class CreateMissedPartitionsCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws Exception {
String topic = validateTopicName(params);
getTopics().createMissedPartitions(topic);
}
}
@Parameters(commandDescription = "Create a non-partitioned topic.")
private class CreateNonPartitionedCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws Exception {
String topic = validateTopicName(params);
getTopics().createNonPartitionedTopic(topic);
}
}
@Parameters(commandDescription = "Update existing non-global partitioned topic. \n"
+ "\t\tNew updating number of partitions must be greater than existing number of partitions.")
private class UpdatePartitionedCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Parameter(names = { "-p",
"--partitions" }, description = "Number of partitions for the topic", required = true)
private int numPartitions;
@Override
void run() throws Exception {
String topic = validateTopicName(params);
getTopics().updatePartitionedTopic(topic, numPartitions);
}
}
@Parameters(commandDescription = "Get the partitioned topic metadata. \n"
+ "\t\tIf the topic is not created or is a non-partitioned topic, it returns empty topic with 0 partitions")
private class GetPartitionedTopicMetadataCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws Exception {
String topic = validateTopicName(params);
print(getTopics().getPartitionedTopicMetadata(topic));
}
}
@Parameters(commandDescription = "Delete a partitioned topic. \n"
+ "\t\tIt will also delete all the partitions of the topic if it exists.")
private class DeletePartitionedCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Parameter(names = { "-f",
"--force" }, description = "Close all producer/consumer/replicator and delete topic forcefully")
private boolean force = false;
@Parameter(names = { "-d",
"--deleteSchema" }, description = "Delete schema while deleting topic")
private boolean deleteSchema = false;
@Override
void run() throws Exception {
String topic = validateTopicName(params);
getTopics().deletePartitionedTopic(topic, force);
if (deleteSchema) {
getAdmin().schemas().deleteSchema(topic);
}
}
}
@Parameters(commandDescription = "Delete a topic. \n"
+ "\t\tThe topic cannot be deleted if there's any active subscription or producers connected to it.")
private class DeleteCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Parameter(names = { "-f",
"--force" }, description = "Close all producer/consumer/replicator and delete topic forcefully")
private boolean force = false;
@Parameter(names = { "-d",
"--deleteSchema" }, description = "Delete schema while deleting topic")
private boolean deleteSchema = false;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getTopics().delete(topic, force);
if (deleteSchema) {
getAdmin().schemas().deleteSchema(topic);
}
}
}
@Parameters(commandDescription = "Unload a topic. \n")
private class UnloadCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getTopics().unload(topic);
}
}
@Parameters(commandDescription = "Get the list of subscriptions on the topic")
private class ListSubscriptions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws Exception {
String topic = validateTopicName(params);
print(getTopics().getSubscriptions(topic));
}
}
@Parameters(commandDescription = "Delete a durable subscriber from a topic. \n"
+ "\t\tThe subscription cannot be deleted if there are any active consumers attached to it \n")
private class DeleteSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-f",
"--force" }, description = "Disconnect and close all consumers and delete subscription forcefully")
private boolean force = false;
@Parameter(names = { "-s", "--subscription" }, description = "Subscription to be deleted", required = true)
private String subName;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getTopics().deleteSubscription(topic, subName, force);
}
}
@Parameters(commandDescription = "Get the stats for the topic and its connected producers and consumers. \n"
+ "\t All the rates are computed over a 1 minute window and are relative the last completed 1 minute period.")
private class GetStats extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Parameter(names = { "-gpb",
"--get-precise-backlog" }, description = "Set true to get precise backlog")
private boolean getPreciseBacklog = false;
@Parameter(names = { "-sbs",
"--get-subscription-backlog-size" }, description = "Set true to get backlog size for each subscription"
+ ", locking required.")
private boolean subscriptionBacklogSize = false;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
print(getTopics().getStats(topic, getPreciseBacklog, subscriptionBacklogSize));
}
}
@Parameters(commandDescription = "Get the internal stats for the topic")
private class GetInternalStats extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Parameter(names = { "-m",
"--metadata" }, description = "Flag to include ledger metadata")
private boolean metadata = false;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
print(getTopics().getInternalStats(topic, metadata));
}
}
@Parameters(commandDescription = "Get the internal metadata info for the topic")
private class GetInternalInfo extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
JsonObject result = getTopics().getInternalInfo(topic);
Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(result));
}
}
@Parameters(commandDescription = "Get the stats for the partitioned topic and its connected producers and consumers. \n"
+ "\t All the rates are computed over a 1 minute window and are relative the last completed 1 minute period.")
private class GetPartitionedStats extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Parameter(names = "--per-partition", description = "Get per partition stats")
private boolean perPartition = false;
@Parameter(names = { "-gpb",
"--get-precise-backlog" }, description = "Set true to get precise backlog")
private boolean getPreciseBacklog = false;
@Parameter(names = { "-sbs",
"--get-subscription-backlog-size" }, description = "Set true to get backlog size for each subscription"
+ ", locking required.")
private boolean subscriptionBacklogSize = false;
@Override
void run() throws Exception {
String topic = validateTopicName(params);
print(topics.getPartitionedStats(topic, perPartition, getPreciseBacklog, subscriptionBacklogSize));
}
}
@Parameters(commandDescription = "Get the internal stats for the partitioned topic and its connected producers and consumers. \n"
+ "\t All the rates are computed over a 1 minute window and are relative the last completed 1 minute period.")
private class GetPartitionedStatsInternal extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws Exception {
String topic = validateTopicName(params);
print(getTopics().getPartitionedInternalStats(topic));
}
}
@Parameters(commandDescription = "Skip all the messages for the subscription")
private class ClearBacklog extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s", "--subscription" }, description = "Subscription to be cleared", required = true)
private String subName;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getTopics().skipAllMessages(topic, subName);
}
}
@Parameters(commandDescription = "Skip some messages for the subscription")
private class Skip extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to be skip messages on", required = true)
private String subName;
@Parameter(names = { "-n", "--count" }, description = "Number of messages to skip", required = true)
private long numMessages;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getTopics().skipMessages(topic, subName, numMessages);
}
}
@Parameters(commandDescription = "Expire messages that older than given expiry time (in seconds) for the subscription")
private class ExpireMessages extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to be skip messages on", required = true)
private String subName;
@Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds")
private long expireTimeInSeconds = -1;
@Parameter(names = { "--position",
"-p" }, description = "message position to reset back to (ledgerId:entryId)", required = false)
private String messagePosition;
@Parameter(names = { "-e", "--exclude-reset-position" },
description = "Exclude the reset position, start consume messages from the next position.", required = false)
private boolean excludeResetPosition = false;
@Override
void run() throws PulsarAdminException {
if (expireTimeInSeconds >= 0 && isNotBlank(messagePosition)) {
throw new ParameterException(String.format("Can't expire message by time and " +
"by message position at the same time."));
}
String topic = validateTopicName(params);
if (expireTimeInSeconds >= 0) {
getTopics().expireMessages(topic, subName, expireTimeInSeconds);
} else if (isNotBlank(messagePosition)) {
int partitionIndex = TopicName.get(topic).getPartitionIndex();
MessageId messageId = validateMessageIdString(messagePosition, partitionIndex);
getTopics().expireMessages(topic, subName, messageId, excludeResetPosition);
} else {
throw new ParameterException(
"Either time (--expireTime) or message position (--position) has to be provided" +
" to expire messages");
}
}
}
@Parameters(commandDescription = "Expire messages that older than given expiry time (in seconds) for all subscriptions")
private class ExpireMessagesForAllSubscriptions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds", required = true)
private long expireTimeInSeconds;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getTopics().expireMessagesForAllSubscriptions(topic, expireTimeInSeconds);
}
}
@Parameters(commandDescription = "Create a new subscription on a topic")
private class CreateSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to reset position on", required = true)
private String subscriptionName;
@Parameter(names = { "--messageId",
"-m" }, description = "messageId where to create the subscription. It can be either 'latest', 'earliest' or (ledgerId:entryId)", required = false)
private String messageIdStr = "latest";
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
MessageId messageId;
if (messageIdStr.equals("latest")) {
messageId = MessageId.latest;
} else if (messageIdStr.equals("earliest")) {
messageId = MessageId.earliest;
} else {
messageId = validateMessageIdString(messageIdStr);
}
getTopics().createSubscription(topic, subscriptionName, messageId);
}
}
@Parameters(commandDescription = "Reset position for subscription to a position that is closest to timestamp or messageId.")
private class ResetCursor extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to reset position on", required = true)
private String subName;
@Parameter(names = { "--time",
"-t" }, description = "time in minutes to reset back to (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = false)
private String resetTimeStr;
@Parameter(names = { "--messageId",
"-m" }, description = "messageId to reset back to (ledgerId:entryId)", required = false)
private String resetMessageIdStr;
@Parameter(names = { "-e", "--exclude-reset-position" },
description = "Exclude the reset position, start consume messages from the next position.", required = false)
private boolean excludeResetPosition = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
if (isNotBlank(resetMessageIdStr)) {
MessageId messageId = validateMessageIdString(resetMessageIdStr);
if (excludeResetPosition) {
getTopics().resetCursor(persistentTopic, subName, messageId, true);
} else {
getTopics().resetCursor(persistentTopic, subName, messageId);
}
} else if (isNotBlank(resetTimeStr)) {
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
// now - go back time
long timestamp = System.currentTimeMillis() - resetTimeInMillis;
getTopics().resetCursor(persistentTopic, subName, timestamp);
} else {
throw new PulsarAdminException(
"Either Timestamp (--time) or Position (--position) has to be provided to reset cursor");
}
}
}
@Parameters(commandDescription = "Terminate a topic and don't allow any more messages to be published")
private class Terminate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
try {
MessageId lastMessageId = getTopics().terminateTopicAsync(persistentTopic).get();
System.out.println("Topic succesfully terminated at " + lastMessageId);
} catch (InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
}
}
}
@Parameters(commandDescription = "Peek some messages for the subscription")
private class PeekMessages extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to get messages from", required = true)
private String subName;
@Parameter(names = { "-n", "--count" }, description = "Number of messages (default 1)", required = false)
private int numMessages = 1;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
List<Message<byte[]>> messages = getTopics().peekMessages(persistentTopic, subName, numMessages);
int position = 0;
for (Message<byte[]> msg : messages) {
if (++position != 1) {
System.out.println("-------------------------------------------------------------------------\n");
}
if (msg.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}
if (msg.getProperties().size() > 0) {
System.out.println("Tenants:");
print(msg.getProperties());
}
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
System.out.println(ByteBufUtil.prettyHexDump(data));
}
}
}
@Parameters(commandDescription = "Examine a specific message on a topic by position relative to the" +
" earliest or the latest message.")
private class ExamineMessages extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-i", "--initialPosition" },
description = "Relative start position to examine message." +
"It can be 'latest' or 'earliest', default is latest")
private String initialPosition = "latest";
@Parameter(names = { "-m", "--messagePosition" },
description = "The position of messages (default 1)", required = false)
private long messagePosition = 1;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
Message<byte[]> messages = getTopics().examineMessage(persistentTopic, initialPosition, messagePosition);
MessageIdImpl msgId = (MessageIdImpl) messages.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
if (messages.getProperties().size() > 0) {
System.out.println("Tenants:");
print(messages.getProperties());
}
ByteBuf data = Unpooled.wrappedBuffer(messages.getData());
System.out.println(ByteBufUtil.prettyHexDump(data));
}
}
@Parameters(commandDescription = "Get message by its ledgerId and entryId")
private class GetMessageById extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-l", "--ledgerId" },
description = "ledger id pointing to the desired ledger",
required = true)
private long ledgerId;
@Parameter(names = { "-e", "--entryId" },
description = "entry id pointing to the desired entry",
required = true)
private long entryId;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
Message<byte[]> message = getTopics().getMessageById(persistentTopic, ledgerId, entryId);
if (message == null) {
System.out.println("Cannot find any messages based on ledgerId:"
+ ledgerId + " entryId:" + entryId);
} else {
ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
}
}
}
@Parameters(commandDescription = "Compact a topic")
private class Compact extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopics().triggerCompaction(persistentTopic);
System.out.println("Topic compaction requested for " + persistentTopic);
}
}
@Parameters(commandDescription = "Status of compaction on a topic")
private class CompactionStatusCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-w", "--wait-complete" },
description = "Wait for compaction to complete", required = false)
private boolean wait = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
try {
LongRunningProcessStatus status = getTopics().compactionStatus(persistentTopic);
while (wait && status.status == LongRunningProcessStatus.Status.RUNNING) {
Thread.sleep(1000);
status = getTopics().compactionStatus(persistentTopic);
}
switch (status.status) {
case NOT_RUN:
System.out.println("Compaction has not been run for " + persistentTopic
+ " since broker startup");
break;
case RUNNING:
System.out.println("Compaction is currently running");
break;
case SUCCESS:
System.out.println("Compaction was a success");
break;
case ERROR:
System.out.println("Error in compaction");
throw new PulsarAdminException("Error compacting: " + status.lastError);
}
} catch (InterruptedException e) {
throw new PulsarAdminException(e);
}
}
}
static MessageId findFirstLedgerWithinThreshold(List<PersistentTopicInternalStats.LedgerInfo> ledgers,
long sizeThreshold) {
long suffixSize = 0L;
ledgers = Lists.reverse(ledgers);
long previousLedger = ledgers.get(0).ledgerId;
for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
suffixSize += l.size;
if (suffixSize > sizeThreshold) {
return new MessageIdImpl(previousLedger, 0L, -1);
}
previousLedger = l.ledgerId;
}
return null;
}
@Parameters(commandDescription = "Trigger offload of data from a topic to long-term storage (e.g. Amazon S3)")
private class Offload extends CliCommand {
@Parameter(names = { "-s", "--size-threshold" },
description = "Maximum amount of data to keep in BookKeeper for the specified topic (e.g. 10M, 5G).",
required = true)
private String sizeThresholdStr;
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
long sizeThreshold = validateSizeString(sizeThresholdStr);
String persistentTopic = validatePersistentTopic(params);
PersistentTopicInternalStats stats = getTopics().getInternalStats(persistentTopic, false);
if (stats.ledgers.size() < 1) {
throw new PulsarAdminException("Topic doesn't have any data");
}
LinkedList<PersistentTopicInternalStats.LedgerInfo> ledgers = new LinkedList(stats.ledgers);
ledgers.get(ledgers.size()-1).size = stats.currentLedgerSize; // doesn't get filled in now it seems
MessageId messageId = findFirstLedgerWithinThreshold(ledgers, sizeThreshold);
if (messageId == null) {
System.out.println("Nothing to offload");
return;
}
getTopics().triggerOffload(persistentTopic, messageId);
System.out.println("Offload triggered for " + persistentTopic + " for messages before " + messageId);
}
}
@Parameters(commandDescription = "Check the status of data offloading from a topic to long-term storage")
private class OffloadStatusCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-w", "--wait-complete" },
description = "Wait for offloading to complete", required = false)
private boolean wait = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
try {
LongRunningProcessStatus status = getTopics().offloadStatus(persistentTopic);
while (wait && status.status == LongRunningProcessStatus.Status.RUNNING) {
Thread.sleep(1000);
status = getTopics().offloadStatus(persistentTopic);
}
switch (status.status) {
case NOT_RUN:
System.out.println("Offload has not been run for " + persistentTopic
+ " since broker startup");
break;
case RUNNING:
System.out.println("Offload is currently running");
break;
case SUCCESS:
System.out.println("Offload was a success");
break;
case ERROR:
System.out.println("Error in offload");
throw new PulsarAdminException("Error offloading: " + status.lastError);
}
} catch (InterruptedException e) {
throw new PulsarAdminException(e);
}
}
}
@Parameters(commandDescription = "get the last commit message id of topic")
private class GetLastMessageId extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopics().getLastMessageId(persistentTopic));
}
}
@Parameters(commandDescription = "Get the backlog quota policies for a topic")
private class GetBacklogQuotaMap extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-ap", "--applied"}, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getBacklogQuotaMap(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set a backlog quota policy for a topic")
private class SetBacklogQuota extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", required = true)
private String limitStr;
@Parameter(names = { "-p", "--policy" }, description = "Retention policy to enforce when the limit is reached. "
+ "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]", required = true)
private String policyStr;
@Override
void run() throws PulsarAdminException {
BacklogQuota.RetentionPolicy policy;
long limit;
try {
policy = BacklogQuota.RetentionPolicy.valueOf(policyStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(String.format("Invalid retention policy type '%s'. Valid options are: %s",
policyStr, Arrays.toString(BacklogQuota.RetentionPolicy.values())));
}
limit = validateSizeString(limitStr);
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setBacklogQuota(persistentTopic, new BacklogQuota(limit, policy));
}
}
@Parameters(commandDescription = "Remove a backlog quota policy from a topic")
private class RemoveBacklogQuota extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeBacklogQuota(persistentTopic);
}
}
@Parameters(commandDescription = "Get the delayed delivery policy for a topic")
private class GetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
print(getAdmin().topics().getDelayedDeliveryPolicy(topicName, applied));
}
}
@Parameters(commandDescription = "Set the delayed delivery policy on a topic")
private class SetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
@Parameter(names = { "--enable", "-e" }, description = "Enable delayed delivery messages")
private boolean enable = false;
@Parameter(names = { "--disable", "-d" }, description = "Disable delayed delivery messages")
private boolean disable = false;
@Parameter(names = { "--time", "-t" }, description = "The tick time for when retrying on delayed delivery messages, " +
"affecting the accuracy of the delivery time compared to the scheduled time. (eg: 1s, 10s, 1m, 5h, 3d)")
private String delayedDeliveryTimeStr = "1s";
@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
long delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr));
if (enable == disable) {
throw new ParameterException("Need to specify either --enable or --disable");
}
getAdmin().topics().setDelayedDeliveryPolicy(topicName, new DelayedDeliveryPolicies(delayedDeliveryTimeInMills, enable));
}
}
@Parameters(commandDescription = "Remove the delayed delivery policy on a topic")
private class RemoveDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
getAdmin().topics().removeDelayedDeliveryPolicy(topicName);
}
}
@Parameters(commandDescription = "Get the message TTL for a topic")
private class GetMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getMessageTTL(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set message TTL for a topic")
private class SetMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-t", "--ttl" }, description = "Message TTL for topic in second, allowed range from 1 to Integer.MAX_VALUE", required = true)
private int messageTTLInSecond;
@Override
void run() throws PulsarAdminException {
if (messageTTLInSecond < 0) {
throw new ParameterException(String.format("Invalid retention policy type '%d'. ", messageTTLInSecond));
}
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setMessageTTL(persistentTopic, messageTTLInSecond);
}
}
@Parameters(commandDescription = "Remove message TTL for a topic")
private class RemoveMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeMessageTTL(persistentTopic);
}
}
@Parameters(commandDescription = "Get deduplication snapshot interval for a topic")
private class GetDeduplicationSnapshotInterval extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getDeduplicationSnapshotInterval(persistentTopic));
}
}
@Parameters(commandDescription = "Set deduplication snapshot interval for a topic")
private class SetDeduplicationSnapshotInterval extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-i", "--interval" }, description =
"Deduplication snapshot interval for topic in second, allowed range from 0 to Integer.MAX_VALUE", required = true)
private int interval;
@Override
void run() throws PulsarAdminException {
if (interval < 0) {
throw new ParameterException(String.format("Invalid interval '%d'. ", interval));
}
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setDeduplicationSnapshotInterval(persistentTopic, interval);
}
}
@Parameters(commandDescription = "Remove deduplication snapshot interval for a topic")
private class RemoveDeduplicationSnapshotInterval extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeDeduplicationSnapshotInterval(persistentTopic);
}
}
@Parameters(commandDescription = "Get the retention policy for a topic")
private class GetRetention extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getRetention(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set the retention policy for a topic")
private class SetRetention extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--time",
"-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w). "
+ "0 means no retention and -1 means infinite time retention", required = true)
private String retentionTimeStr;
@Parameter(names = { "--size", "-s" }, description = "Retention size limit (eg: 10M, 16G, 3T). "
+ "0 or less than 1MB means no retention and -1 means infinite size retention", required = true)
private String limitStr;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
long sizeLimit = validateSizeString(limitStr);
long retentionTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(retentionTimeStr);
final int retentionTimeInMin;
if (retentionTimeInSec != -1) {
retentionTimeInMin = (int) TimeUnit.SECONDS.toMinutes(retentionTimeInSec);
} else {
retentionTimeInMin = -1;
}
final int retentionSizeInMB;
if (sizeLimit != -1) {
retentionSizeInMB = (int) (sizeLimit / (1024 * 1024));
} else {
retentionSizeInMB = -1;
}
getAdmin().topics().setRetention(persistentTopic, new RetentionPolicies(retentionTimeInMin, retentionSizeInMB));
}
}
@Deprecated
@Parameters(commandDescription = "Enable the deduplication policy for a topic")
private class EnableDeduplication extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().enableDeduplication(persistentTopic, true);
}
}
@Deprecated
@Parameters(commandDescription = "Disable the deduplication policy for a topic")
private class DisableDeduplication extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().enableDeduplication(persistentTopic, false);
}
}
@Parameters(commandDescription = "Enable or disable deduplication for a topic")
private class SetDeduplicationStatus extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--enable", "-e" }, description = "Enable deduplication")
private boolean enable = false;
@Parameter(names = { "--disable", "-d" }, description = "Disable deduplication")
private boolean disable = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
if (enable == disable) {
throw new ParameterException("Need to specify either --enable or --disable");
}
getAdmin().topics().setDeduplicationStatus(persistentTopic, enable);
}
}
@Parameters(commandDescription = "Get the deduplication policy for a topic")
private class GetDeduplicationStatus extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getDeduplicationStatus(persistentTopic));
}
}
@Parameters(commandDescription = "Remove the deduplication policy for a topic")
private class RemoveDeduplicationStatus extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeDeduplicationStatus(persistentTopic);
}
}
@Parameters(commandDescription = "Remove the retention policy for a topic")
private class RemoveRetention extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeRetention(persistentTopic);
}
}
@Parameters(commandDescription = "Get the persistence policies for a topic")
private class GetPersistence extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getPersistence(persistentTopic));
}
}
@Parameters(commandDescription = "Get the offload policies for a topic")
private class GetOffloadPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getOffloadPolicies(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Remove the offload policies for a topic")
private class RemoveOffloadPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeOffloadPolicies(persistentTopic);
}
}
@Parameters(commandDescription = "Set the offload policies for a topic")
private class SetOffloadPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-d", "--driver"}, description = "ManagedLedger offload driver", required = true)
private String driver;
@Parameter(names = {"-r", "--region"}
, description = "ManagedLedger offload region, s3 and google-cloud-storage requires this parameter")
private String region;
@Parameter(names = {"-b", "--bucket"}
, description = "ManagedLedger offload bucket, s3 and google-cloud-storage requires this parameter")
private String bucket;
@Parameter(names = {"-e", "--endpoint"}
, description = "ManagedLedger offload service endpoint, only s3 requires this parameter")
private String endpoint;
@Parameter(names = {"-i", "--aws-id"}
, description = "AWS Credential Id to use when using driver S3 or aws-s3")
private String awsId;
@Parameter(names = {"-s", "--aws-secret"}
, description = "AWS Credential Secret to use when using driver S3 or aws-s3")
private String awsSecret;
@Parameter(names = {"-m", "--maxBlockSizeInBytes"}
, description = "ManagedLedger offload max block Size in bytes, s3 and google-cloud-storage requires this parameter")
private int maxBlockSizeInBytes;
@Parameter(names = {"-rb", "--readBufferSizeInBytes"}
, description = "ManagedLedger offload read buffer size in bytes, s3 and google-cloud-storage requires this parameter")
private int readBufferSizeInBytes;
@Parameter(names = {"-t", "--offloadThresholdInBytes"}
, description = "ManagedLedger offload threshold in bytes", required = true)
private long offloadThresholdInBytes;
@Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
, description = "ManagedLedger offload deletion lag in bytes")
private Long offloadDeletionLagInMillis;
@Parameter(
names = {"--offloadedReadPriority", "-orp"},
description = "Read priority for offloaded messages. By default, once messages are offloaded to long-term storage, brokers read messages from long-term storage, but messages can still exist in BookKeeper for a period depends on your configuration. For messages that exist in both long-term storage and BookKeeper, you can set where to read messages from with the option `tiered-storage-first` or `bookkeeper-first`.",
required = false
)
private String offloadReadPriorityStr;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
OffloadedReadPriority offloadedReadPriority = OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY;
if (this.offloadReadPriorityStr != null) {
try {
offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
} catch (Exception e) {
throw new ParameterException("--offloadedReadPriority parameter must be one of " +
Arrays.stream(OffloadedReadPriority.values())
.map(OffloadedReadPriority::toString)
.collect(Collectors.joining(","))
+ " but got: " + this.offloadReadPriorityStr, e);
}
}
OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes
, readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority);
getAdmin().topics().setOffloadPolicies(persistentTopic, offloadPolicies);
}
}
@Parameters(commandDescription = "Set the persistence policies for a topic")
private class SetPersistence extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-e",
"--bookkeeper-ensemble" }, description = "Number of bookies to use for a topic", required = true)
private int bookkeeperEnsemble;
@Parameter(names = { "-w",
"--bookkeeper-write-quorum" }, description = "How many writes to make of each entry", required = true)
private int bookkeeperWriteQuorum;
@Parameter(names = { "-a",
"--bookkeeper-ack-quorum" }, description = "Number of acks (garanteed copies) to wait for each entry", required = true)
private int bookkeeperAckQuorum;
@Parameter(names = { "-r",
"--ml-mark-delete-max-rate" }, description = "Throttling rate of mark-delete operation (0 means no throttle)", required = true)
private double managedLedgerMaxMarkDeleteRate;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setPersistence(persistentTopic, new PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum, managedLedgerMaxMarkDeleteRate));
}
}
@Parameters(commandDescription = "Remove the persistence policy for a topic")
private class RemovePersistence extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removePersistence(persistentTopic);
}
}
@Parameters(commandDescription = "Get message dispatch rate for a topic")
private class GetDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getDispatchRate(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set message dispatch rate for a topic")
private class SetDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--msg-dispatch-rate",
"-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
private int msgDispatchRate = -1;
@Parameter(names = { "--byte-dispatch-rate",
"-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
private long byteDispatchRate = -1;
@Parameter(names = { "--dispatch-rate-period",
"-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
private int dispatchRatePeriodSec = 1;
@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
private boolean relativeToPublishRate = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setDispatchRate(persistentTopic,
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
}
}
@Parameters(commandDescription = "Remove message dispatch rate for a topic")
private class RemoveDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeDispatchRate(persistentTopic);
}
}
@Parameters(commandDescription = "Get max unacked messages policy on consumer for a topic")
private class GetMaxUnackedMessagesOnConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getMaxUnackedMessagesOnConsumer(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Remove max unacked messages policy on consumer for a topic")
private class RemoveMaxUnackedMessagesOnConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeMaxUnackedMessagesOnConsumer(persistentTopic);
}
}
@Parameters(commandDescription = "Set max unacked messages policy on consumer for a topic")
private class SetMaxUnackedMessagesOnConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true)
private int maxNum;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum);
}
}
@Parameters(commandDescription = "Get max unacked messages policy on subscription for a topic")
private class GetMaxUnackedMessagesOnSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getMaxUnackedMessagesOnSubscription(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Remove max unacked messages policy on subscription for a topic")
private class RemoveMaxUnackedMessagesOnSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeMaxUnackedMessagesOnSubscription(persistentTopic);
}
}
@Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic")
private class SetMaxUnackedMessagesOnSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on subscription", required = true)
private int maxNum;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum);
}
}
@Parameters(commandDescription = "Set subscription types enabled for a topic")
private class SetSubscriptionTypesEnabled extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--types",
"-t" }, description = "Subscription types enabled list (comma separated values)", required = true)
private String subTypes;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
Set<SubscriptionType> types = new HashSet<>();
Lists.newArrayList(subTypes.split(",")).forEach(s -> types.add(SubscriptionType.valueOf(s)));
getAdmin().topics().setSubscriptionTypesEnabled(persistentTopic, types);
}
}
@Parameters(commandDescription = "Get subscription types enabled for a topic")
private class GetSubscriptionTypesEnabled extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getSubscriptionTypesEnabled(persistentTopic));
}
}
@Parameters(commandDescription = "Get compaction threshold for a topic")
private class GetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getCompactionThreshold(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set compaction threshold for a topic")
private class SetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--threshold", "-t" },
description = "Maximum number of bytes in a topic backlog before compaction is triggered "
+ "(eg: 10M, 16G, 3T). 0 disables automatic compaction",
required = true)
private String threshold = "0";
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setCompactionThreshold(persistentTopic, validateSizeString(threshold));
}
}
@Parameters(commandDescription = "Remove compaction threshold for a topic")
private class RemoveCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeCompactionThreshold(persistentTopic);
}
}
@Parameters(commandDescription = "Get publish rate for a topic")
private class GetPublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getPublishRate(persistentTopic));
}
}
@Parameters(commandDescription = "Set publish rate for a topic")
private class SetPublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--msg-publish-rate",
"-m" }, description = "message-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
private int msgPublishRate = -1;
@Parameter(names = { "--byte-publish-rate",
"-b" }, description = "byte-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
private long bytePublishRate = -1;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setPublishRate(persistentTopic,
new PublishRate(msgPublishRate, bytePublishRate));
}
}
@Parameters(commandDescription = "Remove publish rate for a topic")
private class RemovePublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removePublishRate(persistentTopic);
}
}
@Parameters(commandDescription = "Get subscription message-dispatch-rate for a topic")
private class GetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getSubscriptionDispatchRate(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set subscription message-dispatch-rate for a topic")
private class SetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--msg-dispatch-rate",
"-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
private int msgDispatchRate = -1;
@Parameter(names = { "--byte-dispatch-rate",
"-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
private long byteDispatchRate = -1;
@Parameter(names = { "--dispatch-rate-period",
"-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
private int dispatchRatePeriodSec = 1;
@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
private boolean relativeToPublishRate = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setSubscriptionDispatchRate(persistentTopic,
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
}
}
@Parameters(commandDescription = "Remove subscription message-dispatch-rate for a topic")
private class RemoveSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeSubscriptionDispatchRate(persistentTopic);
}
}
@Parameters(commandDescription = "Get replicator message-dispatch-rate for a topic")
private class GetReplicatorDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-ap", "--applied"}, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String topic = validatePersistentTopic(params);
print(getAdmin().topics().getReplicatorDispatchRate(topic, applied));
}
}
@Parameters(commandDescription = "Set replicator message-dispatch-rate for a topic")
private class SetReplicatorDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--msg-dispatch-rate",
"-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
private int msgDispatchRate = -1;
@Parameter(names = { "--byte-dispatch-rate",
"-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
private long byteDispatchRate = -1;
@Parameter(names = { "--dispatch-rate-period",
"-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
private int dispatchRatePeriodSec = 1;
@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
private boolean relativeToPublishRate = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setReplicatorDispatchRate(persistentTopic,
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
}
}
@Parameters(commandDescription = "Remove replicator message-dispatch-rate for a topic")
private class RemoveReplicatorDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeReplicatorDispatchRate(persistentTopic);
}
}
@Parameters(commandDescription = "Get max number of producers for a topic")
private class GetMaxProducers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getMaxProducers(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set max number of producers for a topic")
private class SetMaxProducers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--max-producers", "-p"}, description = "Max producers for a topic", required = true)
private int maxProducers;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setMaxProducers(persistentTopic, maxProducers);
}
}
@Parameters(commandDescription = "Remove max number of producers for a topic")
private class RemoveMaxProducers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeMaxProducers(persistentTopic);
}
}
@Parameters(commandDescription = "Get max number of subscriptions for a topic")
private class GetMaxSubscriptionsPerTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getMaxSubscriptionsPerTopic(persistentTopic));
}
}
@Parameters(commandDescription = "Set max number of subscriptions for a topic")
private class SetMaxSubscriptionsPerTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--max-subscriptions-per-topic", "-m"},
description = "Maximum subscription limit for a topic", required = true)
private int maxSubscriptionsPerTopic;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setMaxSubscriptionsPerTopic(persistentTopic, maxSubscriptionsPerTopic);
}
}
@Parameters(commandDescription = "Remove max number of subscriptions for a topic")
private class RemoveMaxSubscriptionsPerTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeMaxSubscriptionsPerTopic(persistentTopic);
}
}
@Parameters(commandDescription = "Get max message size for a topic")
private class GetMaxMessageSize extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getMaxMessageSize(persistentTopic));
}
}
@Parameters(commandDescription = "Set max message size for a topic")
private class SetMaxMessageSize extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--max-message-size", "-m"}, description = "Max message size for a topic", required = true)
private int maxMessageSize;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setMaxMessageSize(persistentTopic, maxMessageSize);
}
}
@Parameters(commandDescription = "Remove max message size for a topic")
private class RemoveMaxMessageSize extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeMaxMessageSize(persistentTopic);
}
}
@Parameters(commandDescription = "Get max consumers per subscription for a topic")
private class GetMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getMaxConsumersPerSubscription(persistentTopic));
}
}
@Parameters(commandDescription = "Set max consumers per subscription for a topic")
private class SetMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--max-consumers-per-subscription", "-c" }, description = "maxConsumersPerSubscription for a namespace", required = true)
private int maxConsumersPerSubscription;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setMaxConsumersPerSubscription(persistentTopic, maxConsumersPerSubscription);
}
}
@Parameters(commandDescription = "Remove max consumers per subscription for a topic")
private class RemoveMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeMaxConsumersPerSubscription(persistentTopic);
}
}
@Parameters(commandDescription = "Get the inactive topic policies on a topic")
private class GetInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getInactiveTopicPolicies(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set the inactive topic policies on a topic")
private class SetInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--enable-delete-while-inactive", "-e" }, description = "Enable delete while inactive")
private boolean enableDeleteWhileInactive = false;
@Parameter(names = { "--disable-delete-while-inactive", "-d" }, description = "Disable delete while inactive")
private boolean disableDeleteWhileInactive = false;
@Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in seconds" +
",topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
private String deleteInactiveTopicsMaxInactiveDuration;
@Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic" +
",Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true)
private String inactiveTopicDeleteMode;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
long maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive");
}
InactiveTopicDeleteMode deleteMode = null;
try {
deleteMode = InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
} catch (IllegalArgumentException e) {
throw new ParameterException("delete mode can only be set to delete_when_no_subscriptions or delete_when_subscriptions_caught_up");
}
getAdmin().topics().setInactiveTopicPolicies(persistentTopic,
new InactiveTopicPolicies(deleteMode, (int) maxInactiveDurationInSeconds, enableDeleteWhileInactive));
}
}
@Parameters(commandDescription = "Remove inactive topic policies from a topic")
private class RemoveInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeInactiveTopicPolicies(persistentTopic);
}
}
@Parameters(commandDescription = "Get max number of consumers for a topic")
private class GetMaxConsumers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getMaxConsumers(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set max number of consumers for a topic")
private class SetMaxConsumers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--max-consumers", "-c" }, description = "Max consumers for a topic", required = true)
private int maxConsumers;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setMaxConsumers(persistentTopic, maxConsumers);
}
}
@Parameters(commandDescription = "Remove max number of consumers for a topic")
private class RemoveMaxConsumers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeMaxConsumers(persistentTopic);
}
}
@Parameters(commandDescription = "Get consumer subscribe rate for a topic")
private class GetSubscribeRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getSubscribeRate(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set consumer subscribe rate for a topic")
private class SetSubscribeRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--subscribe-rate",
"-sr" }, description = "subscribe-rate (default -1 will be overwrite if not passed)\n", required = false)
private int subscribeRate = -1;
@Parameter(names = { "--subscribe-rate-period",
"-st" }, description = "subscribe-rate-period in second type (default 30 second will be overwrite if not passed)\n", required = false)
private int subscribeRatePeriodSec = 30;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().setSubscribeRate(persistentTopic,
new SubscribeRate(subscribeRate, subscribeRatePeriodSec));
}
}
@Parameters(commandDescription = "Remove consumer subscribe rate for a topic")
private class RemoveSubscribeRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getAdmin().topics().removeSubscribeRate(persistentTopic);
}
}
}