blob: 00ea7fa2565a540148abd1881850c82239f8f413 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.admin.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.TopicPolicies;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@Parameters(commandDescription = "Operations on persistent topics")
public class CmdTopicPolicies extends CmdBase {
public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
super("topicPolicies", admin);
jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());
jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
jcommander.addCommand("get-max-unacked-messages-per-consumer", new GetMaxUnackedMessagesPerConsumer());
jcommander.addCommand("set-max-unacked-messages-per-consumer", new SetMaxUnackedMessagesPerConsumer());
jcommander.addCommand("remove-max-unacked-messages-per-consumer", new RemoveMaxUnackedMessagesPerConsumer());
jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
jcommander.addCommand("set-subscription-types-enabled", new SetSubscriptionTypesEnabled());
jcommander.addCommand("get-subscription-types-enabled", new GetSubscriptionTypesEnabled());
jcommander.addCommand("remove-subscription-types-enabled", new RemoveSubscriptionTypesEnabled());
jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
jcommander.addCommand("remove-retention", new RemoveRetention());
jcommander.addCommand("get-backlog-quota", new GetBacklogQuotaMap());
jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
jcommander.addCommand("remove-backlog-quota", new RemoveBacklogQuota());
jcommander.addCommand("get-max-producers", new GetMaxProducers());
jcommander.addCommand("set-max-producers", new SetMaxProducers());
jcommander.addCommand("remove-max-producers", new RemoveMaxProducers());
jcommander.addCommand("get-max-message-size", new GetMaxMessageSize());
jcommander.addCommand("set-max-message-size", new SetMaxMessageSize());
jcommander.addCommand("remove-max-message-size", new RemoveMaxMessageSize());
jcommander.addCommand("set-deduplication", new SetDeduplicationStatus());
jcommander.addCommand("get-deduplication", new GetDeduplicationStatus());
jcommander.addCommand("remove-deduplication", new RemoveDeduplicationStatus());
jcommander.addCommand("get-deduplication-snapshot-interval", new GetDeduplicationSnapshotInterval());
jcommander.addCommand("set-deduplication-snapshot-interval", new SetDeduplicationSnapshotInterval());
jcommander.addCommand("remove-deduplication-snapshot-interval", new RemoveDeduplicationSnapshotInterval());
jcommander.addCommand("get-persistence", new GetPersistence());
jcommander.addCommand("set-persistence", new SetPersistence());
jcommander.addCommand("remove-persistence", new RemovePersistence());
jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());
jcommander.addCommand("get-replicator-dispatch-rate", new GetReplicatorDispatchRate());
jcommander.addCommand("set-replicator-dispatch-rate", new SetReplicatorDispatchRate());
jcommander.addCommand("remove-replicator-dispatch-rate", new RemoveReplicatorDispatchRate());
jcommander.addCommand("get-publish-rate", new GetPublishRate());
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
jcommander.addCommand("get-subscribe-rate", new GetSubscribeRate());
jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate());
jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate());
jcommander.addCommand("get-max-consumers", new GetMaxConsumers());
jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());
jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
jcommander.addCommand("remove-delayed-delivery", new RemoveDelayedDelivery());
jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
jcommander.addCommand("get-offload-policies", new GetOffloadPolicies());
jcommander.addCommand("set-offload-policies", new SetOffloadPolicies());
jcommander.addCommand("remove-offload-policies", new RemoveOffloadPolicies());
jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesPerSubscription());
jcommander.addCommand("set-max-unacked-messages-per-subscription", new SetMaxUnackedMessagesPerSubscription());
jcommander.addCommand("remove-max-unacked-messages-per-subscription",
new RemoveMaxUnackedMessagesPerSubscription());
jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
jcommander.addCommand("get-max-subscriptions-per-topic", new GetMaxSubscriptionsPerTopic());
jcommander.addCommand("set-max-subscriptions-per-topic", new SetMaxSubscriptionsPerTopic());
jcommander.addCommand("remove-max-subscriptions-per-topic", new RemoveMaxSubscriptionsPerTopic());
}
@Parameters(commandDescription = "Get max consumers per subscription for a topic")
private class GetMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxConsumersPerSubscription(persistentTopic));
}
}
@Parameters(commandDescription = "Set max consumers per subscription for a topic")
private class SetMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--max-consumers-per-subscription", "-c" },
description = "maxConsumersPerSubscription for a namespace", required = true)
private int maxConsumersPerSubscription;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxConsumersPerSubscription(persistentTopic, maxConsumersPerSubscription);
}
}
@Parameters(commandDescription = "Remove max consumers per subscription for a topic")
private class RemoveMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxConsumersPerSubscription(persistentTopic);
}
}
@Parameters(commandDescription = "Get max unacked messages policy per consumer for a topic")
private class GetMaxUnackedMessagesPerConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxUnackedMessagesOnConsumer(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Remove max unacked messages policy per consumer for a topic")
private class RemoveMaxUnackedMessagesPerConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxUnackedMessagesOnConsumer(persistentTopic);
}
}
@Parameters(commandDescription = "Set max unacked messages policy per consumer for a topic")
private class SetMaxUnackedMessagesPerConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true)
private int maxNum;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum);
}
}
@Parameters(commandDescription = "Get the message TTL for a topic")
private class GetMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMessageTTL(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set message TTL for a topic")
private class SetMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-t", "--ttl" }, description = "Message TTL for topic in second, "
+ "allowed range from 1 to Integer.MAX_VALUE", required = true)
private int messageTTLInSecond;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
if (messageTTLInSecond < 0) {
throw new ParameterException(String.format("Invalid retention policy type '%d'. ", messageTTLInSecond));
}
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMessageTTL(persistentTopic, messageTTLInSecond);
}
}
@Parameters(commandDescription = "Remove message TTL for a topic")
private class RemoveMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMessageTTL(persistentTopic);
}
}
@Parameters(commandDescription = "Set subscription types enabled for a topic")
private class SetSubscriptionTypesEnabled extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--types", "-t"}, description = "Subscription types enabled list (comma separated values)."
+ " Possible values: (Exclusive, Shared, Failover, Key_Shared).", required = true)
private List<String> subTypes;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
Set<SubscriptionType> types = new HashSet<>();
subTypes.forEach(s -> {
SubscriptionType subType;
try {
subType = SubscriptionType.valueOf(s);
} catch (IllegalArgumentException exception) {
throw new ParameterException(String.format("Illegal subscription type %s. Possible values: %s.", s,
Arrays.toString(SubscriptionType.values())));
}
types.add(subType);
});
getTopicPolicies(isGlobal).setSubscriptionTypesEnabled(persistentTopic, types);
}
}
@Parameters(commandDescription = "Get subscription types enabled for a topic")
private class GetSubscriptionTypesEnabled extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getSubscriptionTypesEnabled(persistentTopic));
}
}
@Parameters(commandDescription = "Remove subscription types enabled for a topic")
private class RemoveSubscriptionTypesEnabled extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeSubscriptionTypesEnabled(persistentTopic);
}
}
@Parameters(commandDescription = "Get max number of consumers for a topic")
private class GetMaxConsumers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxConsumers(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set max number of consumers for a topic")
private class SetMaxConsumers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--max-consumers", "-c" }, description = "Max consumers for a topic", required = true)
private int maxConsumers;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxConsumers(persistentTopic, maxConsumers);
}
}
@Parameters(commandDescription = "Remove max number of consumers for a topic")
private class RemoveMaxConsumers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxConsumers(persistentTopic);
}
}
@Parameters(commandDescription = "Get the retention policy for a topic")
private class GetRetention extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getRetention(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set the retention policy for a topic")
private class SetRetention extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private List<String> params;
@Parameter(names = { "--time",
"-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w). "
+ "0 means no retention and -1 means infinite time retention", required = true)
private String retentionTimeStr;
@Parameter(names = { "--size", "-s" }, description = "Retention size limit (eg: 10M, 16G, 3T). "
+ "0 or less than 1MB means no retention and -1 means infinite size retention", required = true)
private String limitStr;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
long sizeLimit = validateSizeString(limitStr);
long retentionTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(retentionTimeStr);
final int retentionTimeInMin;
if (retentionTimeInSec != -1) {
retentionTimeInMin = (int) TimeUnit.SECONDS.toMinutes(retentionTimeInSec);
} else {
retentionTimeInMin = -1;
}
final int retentionSizeInMB;
if (sizeLimit != -1) {
retentionSizeInMB = (int) (sizeLimit / (1024 * 1024));
} else {
retentionSizeInMB = -1;
}
getTopicPolicies(isGlobal).setRetention(persistentTopic,
new RetentionPolicies(retentionTimeInMin, retentionSizeInMB));
}
}
@Parameters(commandDescription = "Remove the retention policy for a topic")
private class RemoveRetention extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeRetention(persistentTopic);
}
}
@Parameters(commandDescription = "Get max unacked messages policy per subscription for a topic")
private class GetMaxUnackedMessagesPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxUnackedMessagesOnSubscription(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Remove max unacked messages policy per subscription for a topic")
private class RemoveMaxUnackedMessagesPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxUnackedMessagesOnSubscription(persistentTopic);
}
}
@Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic")
private class SetMaxUnackedMessagesPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-m", "--maxNum"},
description = "max unacked messages num on subscription", required = true)
private int maxNum;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum);
}
}
@Parameters(commandDescription = "Get max number of producers for a topic")
private class GetMaxProducers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxProducers(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set max number of producers for a topic")
private class SetMaxProducers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--max-producers", "-p"}, description = "Max producers for a topic", required = true)
private int maxProducers;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxProducers(persistentTopic, maxProducers);
}
}
@Parameters(commandDescription = "Get the delayed delivery policy for a topic")
private class GetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal;
@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
print(getTopicPolicies(isGlobal).getDelayedDeliveryPolicy(topicName, applied));
}
}
@Parameters(commandDescription = "Set the delayed delivery policy on a topic")
private class SetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
@Parameter(names = { "--enable", "-e" }, description = "Enable delayed delivery messages")
private boolean enable = false;
@Parameter(names = { "--disable", "-d" }, description = "Disable delayed delivery messages")
private boolean disable = false;
@Parameter(names = { "--time", "-t" }, description = "The tick time for when retrying on "
+ "delayed delivery messages, affecting the accuracy of the delivery time compared to "
+ "the scheduled time. (eg: 1s, 10s, 1m, 5h, 3d)")
private String delayedDeliveryTimeStr = "1s";
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal;
@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
long delayedDeliveryTimeInMills;
try {
delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(
RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
if (enable == disable) {
throw new ParameterException("Need to specify either --enable or --disable");
}
getTopicPolicies(isGlobal).setDelayedDeliveryPolicy(topicName, DelayedDeliveryPolicies.builder()
.tickTime(delayedDeliveryTimeInMills)
.active(enable)
.build());
}
}
@Parameters(commandDescription = "Remove the delayed delivery policy on a topic")
private class RemoveDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal;
@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
getTopicPolicies(isGlobal).removeDelayedDeliveryPolicy(topicName);
}
}
@Parameters(commandDescription = "Remove max number of producers for a topic")
private class RemoveMaxProducers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxProducers(persistentTopic);
}
}
@Parameters(commandDescription = "Get max message size for a topic")
private class GetMaxMessageSize extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ "If set to true, broker returns global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxMessageSize(persistentTopic));
}
}
@Parameters(commandDescription = "Set max message size for a topic")
private class SetMaxMessageSize extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--max-message-size", "-m"}, description = "Max message size for a topic", required = true)
private int maxMessageSize;
@Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally.")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxMessageSize(persistentTopic, maxMessageSize);
}
}
@Parameters(commandDescription = "Remove max message size for a topic")
private class RemoveMaxMessageSize extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. ")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxMessageSize(persistentTopic);
}
}
@Parameters(commandDescription = "Enable or disable status for a topic")
private class SetDeduplicationStatus extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--enable", "-e" }, description = "Enable deduplication")
private boolean enable = false;
@Parameter(names = { "--disable", "-d" }, description = "Disable deduplication")
private boolean disable = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
if (enable == disable) {
throw new ParameterException("Need to specify either --enable or --disable");
}
getTopicPolicies(isGlobal).setDeduplicationStatus(persistentTopic, enable);
}
}
@Parameters(commandDescription = "Get the deduplication status for a topic")
private class GetDeduplicationStatus extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. ")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getDeduplicationStatus(persistentTopic));
}
}
@Parameters(commandDescription = "Remove the deduplication status for a topic")
private class RemoveDeduplicationStatus extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeDeduplicationStatus(persistentTopic);
}
}
@Parameters(commandDescription = "Get deduplication snapshot interval for a topic")
private class GetDeduplicationSnapshotInterval extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ "If set to true, broker returns global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getDeduplicationSnapshotInterval(persistentTopic));
}
}
@Parameters(commandDescription = "Set deduplication snapshot interval for a topic")
private class SetDeduplicationSnapshotInterval extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-i", "--interval"}, description =
"Deduplication snapshot interval for topic in second, allowed range from 0 to Integer.MAX_VALUE",
required = true)
private int interval;
@Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally.")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
if (interval < 0) {
throw new ParameterException(String.format("Invalid interval '%d'. ", interval));
}
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setDeduplicationSnapshotInterval(persistentTopic, interval);
}
}
@Parameters(commandDescription = "Remove deduplication snapshot interval for a topic")
private class RemoveDeduplicationSnapshotInterval extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. ")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeDeduplicationSnapshotInterval(persistentTopic);
}
}
@Parameters(commandDescription = "Get the backlog quota policies for a topic")
private class GetBacklogQuotaMap extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-ap", "--applied"}, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getBacklogQuotaMap(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set a backlog quota policy for a topic")
private class SetBacklogQuota extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)")
private String limitStr = "-1";
@Parameter(names = { "-lt", "--limitTime" },
description = "Time limit in second, non-positive number for disabling time limit.")
private int limitTime = -1;
@Parameter(names = { "-p", "--policy" }, description = "Retention policy to enforce when the limit is reached. "
+ "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]",
required = true)
private String policyStr;
@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: "
+ "destination_storage and message_age. "
+ "destination_storage limits backlog by size (in bytes). "
+ "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). "
+ "You can set size or time to control the backlog, or combine them together to control the backlog. ")
private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
BacklogQuota.RetentionPolicy policy;
long limit;
BacklogQuota.BacklogQuotaType backlogQuotaType;
try {
policy = BacklogQuota.RetentionPolicy.valueOf(policyStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(String.format("Invalid retention policy type '%s'. Valid options are: %s",
policyStr, Arrays.toString(BacklogQuota.RetentionPolicy.values())));
}
limit = validateSizeString(limitStr);
try {
backlogQuotaType = BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaTypeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(String.format("Invalid backlog quota type '%s'. Valid options are: %s",
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setBacklogQuota(persistentTopic,
BacklogQuota.builder().limitSize(limit)
.limitTime(limitTime)
.retentionPolicy(policy)
.build(),
backlogQuotaType);
}
}
@Parameters(commandDescription = "Remove a backlog quota policy from a topic")
private class RemoveBacklogQuota extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to remove")
private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name();
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal)
.removeBacklogQuota(persistentTopic, BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType));
}
}
@Parameters(commandDescription = "Get publish rate for a topic")
private class GetPublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ "If set to true, broker returns global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getPublishRate(persistentTopic));
}
}
@Parameters(commandDescription = "Set publish rate for a topic")
private class SetPublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--msg-publish-rate", "-m"}, description = "message-publish-rate (default -1 will be "
+ "overwrite if not passed)", required = false)
private int msgPublishRate = -1;
@Parameter(names = {"--byte-publish-rate", "-b"}, description = "byte-publish-rate "
+ "(default -1 will be overwrite if not passed)", required = false)
private long bytePublishRate = -1;
@Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally.")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setPublishRate(persistentTopic,
new PublishRate(msgPublishRate, bytePublishRate));
}
}
@Parameters(commandDescription = "Remove publish rate for a topic")
private class RemovePublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. ")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removePublishRate(persistentTopic);
}
}
@Parameters(commandDescription = "Get consumer subscribe rate for a topic")
private class GetSubscribeRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ "If set to true, broker returns global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getSubscribeRate(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set consumer subscribe rate for a topic")
private class SetSubscribeRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--subscribe-rate",
"-sr" }, description = "subscribe-rate (default -1 will be overwrite if not passed)", required = false)
private int subscribeRate = -1;
@Parameter(names = { "--subscribe-rate-period",
"-st" }, description = "subscribe-rate-period in second type "
+ "(default 30 second will be overwrite if not passed)", required = false)
private int subscribeRatePeriodSec = 30;
@Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally.")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setSubscribeRate(persistentTopic,
new SubscribeRate(subscribeRate, subscribeRatePeriodSec));
}
}
@Parameters(commandDescription = "Remove consumer subscribe rate for a topic")
private class RemoveSubscribeRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. ")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeSubscribeRate(persistentTopic);
}
}
@Parameters(commandDescription = "Get the persistence policies for a topic")
private class GetPersistence extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies", arity = 0)
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getPersistence(persistentTopic));
}
}
@Parameters(commandDescription = "Set the persistence policies for a topic")
private class SetPersistence extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-e",
"--bookkeeper-ensemble" }, description = "Number of bookies to use for a topic", required = true)
private int bookkeeperEnsemble;
@Parameter(names = { "-w",
"--bookkeeper-write-quorum" }, description = "How many writes to make of each entry", required = true)
private int bookkeeperWriteQuorum;
@Parameter(names = { "-a", "--bookkeeper-ack-quorum" },
description = "Number of acks (guaranteed copies) to wait for each entry", required = true)
private int bookkeeperAckQuorum;
@Parameter(names = { "-r", "--ml-mark-delete-max-rate" },
description = "Throttling rate of mark-delete operation (0 means no throttle)", required = true)
private double managedLedgerMaxMarkDeleteRate;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously", arity = 0)
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setPersistence(persistentTopic, new PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum, managedLedgerMaxMarkDeleteRate));
}
}
@Parameters(commandDescription = "Remove the persistence policy for a topic")
private class RemovePersistence extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously"
, arity = 0)
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removePersistence(persistentTopic);
}
}
@Parameters(commandDescription = "Get compaction threshold for a topic")
private class GetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getCompactionThreshold(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set compaction threshold for a topic")
private class SetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--threshold", "-t" },
description = "Maximum number of bytes in a topic backlog before compaction is triggered "
+ "(eg: 10M, 16G, 3T). 0 disables automatic compaction",
required = true)
private String thresholdStr = "0";
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
long threshold = validateSizeString(thresholdStr);
getTopicPolicies(isGlobal).setCompactionThreshold(persistentTopic, threshold);
}
}
@Parameters(commandDescription = "Remove compaction threshold for a topic")
private class RemoveCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeCompactionThreshold(persistentTopic);
}
}
@Parameters(commandDescription = "Get message dispatch rate for a topic")
private class GetDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getDispatchRate(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set message dispatch rate for a topic")
private class SetDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--msg-dispatch-rate",
"-md" }, description = "message-dispatch-rate "
+ "(default -1 will be overwrite if not passed)", required = false)
private int msgDispatchRate = -1;
@Parameter(names = { "--byte-dispatch-rate",
"-bd" }, description = "byte-dispatch-rate "
+ "(default -1 will be overwrite if not passed)", required = false)
private long byteDispatchRate = -1;
@Parameter(names = { "--dispatch-rate-period",
"-dt" }, description = "dispatch-rate-period in second type "
+ "(default 1 second will be overwrite if not passed)", required = false)
private int dispatchRatePeriodSec = 1;
@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled "
+ "then broker will apply throttling value to (publish-rate + dispatch rate))", required = false)
private boolean relativeToPublishRate = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setDispatchRate(persistentTopic,
DispatchRate.builder()
.dispatchThrottlingRateInMsg(msgDispatchRate)
.dispatchThrottlingRateInByte(byteDispatchRate)
.ratePeriodInSecond(dispatchRatePeriodSec)
.relativeToPublishRate(relativeToPublishRate)
.build());
}
}
@Parameters(commandDescription = "Remove message dispatch rate for a topic")
private class RemoveDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeDispatchRate(persistentTopic);
}
}
@Parameters(commandDescription = "Get the inactive topic policies on a topic")
private class GetInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getInactiveTopicPolicies(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set the inactive topic policies on a topic")
private class SetInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--enable-delete-while-inactive", "-e" }, description = "Enable delete while inactive")
private boolean enableDeleteWhileInactive = false;
@Parameter(names = { "--disable-delete-while-inactive", "-d" }, description = "Disable delete while inactive")
private boolean disableDeleteWhileInactive = false;
@Parameter(names = {"--max-inactive-duration", "-t"},
description = "Max duration of topic inactivity in seconds, topics that are inactive for longer than "
+ "this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
private String deleteInactiveTopicsMaxInactiveDuration;
@Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic, Valid options are: "
+ "[delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true)
private String inactiveTopicDeleteMode;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
long maxInactiveDurationInSeconds;
try {
maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(
RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
throw new ParameterException("Need to specify either enable-delete-while-inactive or "
+ "disable-delete-while-inactive");
}
InactiveTopicDeleteMode deleteMode;
try {
deleteMode = InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
} catch (IllegalArgumentException e) {
throw new ParameterException("delete mode can only be set to delete_when_no_subscriptions or "
+ "delete_when_subscriptions_caught_up");
}
getTopicPolicies(isGlobal).setInactiveTopicPolicies(persistentTopic, new InactiveTopicPolicies(deleteMode,
(int) maxInactiveDurationInSeconds, enableDeleteWhileInactive));
}
}
@Parameters(commandDescription = "Remove inactive topic policies from a topic")
private class RemoveInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeInactiveTopicPolicies(persistentTopic);
}
}
@Parameters(commandDescription = "Get replicator message-dispatch-rate for a topic")
private class GetReplicatorDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getReplicatorDispatchRate(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set replicator message-dispatch-rate for a topic")
private class SetReplicatorDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--msg-dispatch-rate",
"-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)")
private int msgDispatchRate = -1;
@Parameter(names = { "--byte-dispatch-rate",
"-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)")
private long byteDispatchRate = -1;
@Parameter(names = {"--dispatch-rate-period",
"-dt"}, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not"
+ " passed)")
private int dispatchRatePeriodSec = 1;
@Parameter(names = {"--relative-to-publish-rate",
"-rp"}, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled "
+ "then broker will apply throttling value to (publish-rate + dispatch rate))")
private boolean relativeToPublishRate = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setReplicatorDispatchRate(persistentTopic,
DispatchRate.builder()
.dispatchThrottlingRateInMsg(msgDispatchRate)
.dispatchThrottlingRateInByte(byteDispatchRate)
.ratePeriodInSecond(dispatchRatePeriodSec)
.relativeToPublishRate(relativeToPublishRate)
.build());
}
}
@Parameters(commandDescription = "Remove replicator message-dispatch-rate for a topic")
private class RemoveReplicatorDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeReplicatorDispatchRate(persistentTopic);
}
}
@Parameters(commandDescription = "Get subscription message-dispatch-rate for a topic")
private class GetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Set subscription message-dispatch-rate for a topic")
private class SetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--msg-dispatch-rate",
"-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)")
private int msgDispatchRate = -1;
@Parameter(names = { "--byte-dispatch-rate",
"-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)")
private long byteDispatchRate = -1;
@Parameter(names = { "--dispatch-rate-period",
"-dt" }, description = "dispatch-rate-period in second type "
+ "(default 1 second will be overwrite if not passed)")
private int dispatchRatePeriodSec = 1;
@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled "
+ "then broker will apply throttling value to (publish-rate + dispatch rate))")
private boolean relativeToPublishRate = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic,
DispatchRate.builder()
.dispatchThrottlingRateInMsg(msgDispatchRate)
.dispatchThrottlingRateInByte(byteDispatchRate)
.ratePeriodInSecond(dispatchRatePeriodSec)
.relativeToPublishRate(relativeToPublishRate)
.build());
}
}
@Parameters(commandDescription = "Remove subscription message-dispatch-rate for a topic")
private class RemoveSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
}
}
@Parameters(commandDescription = "Get max subscriptions for a topic")
private class GetMaxSubscriptionsPerTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxSubscriptionsPerTopic(persistentTopic));
}
}
@Parameters(commandDescription = "Set max subscriptions for a topic")
private class SetMaxSubscriptionsPerTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--max-subscriptions-per-topic",
"-s"}, description = "max subscriptions for a topic (default -1 will be overwrite if not passed)",
required = true)
private int maxSubscriptionPerTopic;
@Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxSubscriptionsPerTopic(persistentTopic, maxSubscriptionPerTopic);
}
}
@Parameters(commandDescription = "Remove max subscriptions for a topic")
private class RemoveMaxSubscriptionsPerTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxSubscriptionsPerTopic(persistentTopic);
}
}
@Parameters(commandDescription = "Get the offload policies for a topic")
private class GetOffloadPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getOffloadPolicies(persistentTopic, applied));
}
}
@Parameters(commandDescription = "Remove the offload policies for a topic")
private class RemoveOffloadPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeOffloadPolicies(persistentTopic);
}
}
@Parameters(commandDescription = "Set the offload policies for a topic")
private class SetOffloadPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-d", "--driver"}, description = "ManagedLedger offload driver", required = true)
private String driver;
@Parameter(names = {"-r", "--region"}
, description = "ManagedLedger offload region, s3 and google-cloud-storage requires this parameter")
private String region;
@Parameter(names = {"-b", "--bucket"}
, description = "ManagedLedger offload bucket, s3 and google-cloud-storage requires this parameter")
private String bucket;
@Parameter(names = {"-e", "--endpoint"}
, description = "ManagedLedger offload service endpoint, only s3 requires this parameter")
private String endpoint;
@Parameter(names = {"-i", "--aws-id"}
, description = "AWS Credential Id to use when using driver S3 or aws-s3")
private String awsId;
@Parameter(names = {"-s", "--aws-secret"}
, description = "AWS Credential Secret to use when using driver S3 or aws-s3")
private String awsSecret;
@Parameter(names = {"--ro", "--s3-role"}
, description = "S3 Role used for STSAssumeRoleSessionCredentialsProvider")
private String s3Role;
@Parameter(names = {"--s3-role-session-name", "-rsn"}
, description = "S3 role session name used for STSAssumeRoleSessionCredentialsProvider")
private String s3RoleSessionName;
@Parameter(names = {"-m", "--maxBlockSizeInBytes"},
description = "ManagedLedger offload max block Size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
private int maxBlockSizeInBytes;
@Parameter(names = {"-rb", "--readBufferSizeInBytes"},
description = "ManagedLedger offload read buffer size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
private int readBufferSizeInBytes;
@Parameter(names = {"-t", "--offloadThresholdInBytes"}
, description = "ManagedLedger offload threshold in bytes", required = true)
private long offloadThresholdInBytes;
@Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
, description = "ManagedLedger offload deletion lag in bytes")
private Long offloadDeletionLagInMillis;
@Parameter(
names = {"--offloadedReadPriority", "-orp"},
description = "Read priority for offloaded messages. By default, once messages are offloaded to"
+ " long-term storage, brokers read messages from long-term storage, but messages can still"
+ " exist in BookKeeper for a period depends on your configuration. For messages that exist"
+ " in both long-term storage and BookKeeper, you can set where to read messages from with"
+ " the option `tiered-storage-first` or `bookkeeper-first`.",
required = false
)
private String offloadReadPriorityStr;
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;
if (this.offloadReadPriorityStr != null) {
try {
offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
} catch (Exception e) {
throw new ParameterException("--offloadedReadPriority parameter must be one of "
+ Arrays.stream(OffloadedReadPriority.values())
.map(OffloadedReadPriority::toString)
.collect(Collectors.joining(","))
+ " but got: " + this.offloadReadPriorityStr, e);
}
}
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
s3Role, s3RoleSessionName,
awsId, awsSecret,
maxBlockSizeInBytes,
readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority);
getTopicPolicies(isGlobal).setOffloadPolicies(persistentTopic, offloadPolicies);
}
}
private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}
}