blob: 766be620e876f2937d84c1512db63b421e18e5c6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.admin.cli;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@Parameters(commandDescription = "Operations on persistent topics. The persistent-topics "
+ "has been deprecated in favor of topics", hidden = true)
public class CmdPersistentTopics extends CmdBase {
private Topics persistentTopics;
public CmdPersistentTopics(Supplier<PulsarAdmin> admin) {
super("persistent", admin);
jcommander.addCommand("list", new ListCmd());
jcommander.addCommand("list-partitioned-topics", new PartitionedTopicListCmd());
jcommander.addCommand("permissions", new Permissions());
jcommander.addCommand("grant-permission", new GrantPermissions());
jcommander.addCommand("revoke-permission", new RevokePermissions());
jcommander.addCommand("lookup", new Lookup());
jcommander.addCommand("bundle-range", new GetBundleRange());
jcommander.addCommand("delete", new DeleteCmd());
jcommander.addCommand("unload", new UnloadCmd());
jcommander.addCommand("truncate", new TruncateCmd());
jcommander.addCommand("subscriptions", new ListSubscriptions());
jcommander.addCommand("unsubscribe", new DeleteSubscription());
jcommander.addCommand("create-subscription", new CreateSubscription());
jcommander.addCommand("stats", new GetStats());
jcommander.addCommand("stats-internal", new GetInternalStats());
jcommander.addCommand("info-internal", new GetInternalInfo());
jcommander.addCommand("partitioned-stats", new GetPartitionedStats());
jcommander.addCommand("partitioned-stats-internal", new GetPartitionedStatsInternal());
jcommander.addCommand("skip", new Skip());
jcommander.addCommand("skip-all", new SkipAll());
jcommander.addCommand("expire-messages", new ExpireMessages());
jcommander.addCommand("expire-messages-all-subscriptions", new ExpireMessagesForAllSubscriptions());
jcommander.addCommand("create-partitioned-topic", new CreatePartitionedCmd());
jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
jcommander.addCommand("peek-messages", new PeekMessages());
jcommander.addCommand("get-message-by-id", new GetMessageById());
jcommander.addCommand("last-message-id", new GetLastMessageId());
jcommander.addCommand("reset-cursor", new ResetCursor());
jcommander.addCommand("terminate", new Terminate());
jcommander.addCommand("compact", new Compact());
jcommander.addCommand("compaction-status", new CompactionStatusCmd());
}
private Topics getPersistentTopics() {
if (persistentTopics == null) {
persistentTopics = getAdmin().topics();
}
return persistentTopics;
}
@Parameters(commandDescription = "Get the list of topics under a namespace.")
private class ListCmd extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
print(getPersistentTopics().getList(namespace));
}
}
@Parameters(commandDescription = "Get the list of partitioned topics under a namespace.")
private class PartitionedTopicListCmd extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
print(getPersistentTopics().getPartitionedTopicList(namespace));
}
}
@Parameters(commandDescription = "Grant a new permission to a client role on a single topic.")
private class GrantPermissions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = "--role", description = "Client role to which grant permissions", required = true)
private String role;
@Parameter(names = "--actions", description = "Actions to be granted (produce,consume,sources,sinks,"
+ "functions,packages)", required = true)
private List<String> actions;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getPersistentTopics().grantPermission(topic, role, getAuthActions(actions));
}
}
@Parameters(commandDescription = "Revoke permissions on a topic. "
+ "Revoke permissions to a client role on a single topic. If the permission "
+ "was not set at the topic level, but rather at the namespace level, this "
+ "operation will return an error (HTTP status code 412).")
private class RevokePermissions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = "--role", description = "Client role to which revoke permissions", required = true)
private String role;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getPersistentTopics().revokePermissions(topic, role);
}
}
@Parameters(commandDescription = "Get the permissions on a topic. "
+ "Retrieve the effective permissions for a topic. These permissions are defined "
+ "by the permissions set at the namespace level combined (union) with any eventual "
+ "specific permission set on the topic.")
private class Permissions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
print(getPersistentTopics().getPermissions(topic));
}
}
@Parameters(commandDescription = "Lookup a topic from the current serving broker")
private class Lookup extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
print(getAdmin().lookups().lookupTopic(topic));
}
}
@Parameters(commandDescription = "Get Namespace bundle range of a topic")
private class GetBundleRange extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
print(getAdmin().lookups().getBundleRange(topic));
}
}
@Parameters(commandDescription = "Create a partitioned topic. "
+ "The partitioned topic has to be created before creating a producer on it.")
private class CreatePartitionedCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-p",
"--partitions" }, description = "Number of partitions for the topic", required = true)
private int numPartitions;
@Override
void run() throws Exception {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().createPartitionedTopic(persistentTopic, numPartitions);
}
}
@Parameters(commandDescription = "Update existing non-global partitioned topic. "
+ "New updating number of partitions must be greater than existing number of partitions.")
private class UpdatePartitionedCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-p",
"--partitions" }, description = "Number of partitions for the topic", required = true)
private int numPartitions;
@Override
void run() throws Exception {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().updatePartitionedTopic(persistentTopic, numPartitions);
}
}
@Parameters(commandDescription = "Get the partitioned topic metadata. "
+ "If the topic is not created or is a non-partitioned topic, it returns empty topic with 0 partitions")
private class GetPartitionedTopicMetadataCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws Exception {
String persistentTopic = validatePersistentTopic(params);
print(getPersistentTopics().getPartitionedTopicMetadata(persistentTopic));
}
}
@Parameters(commandDescription = "Delete a partitioned topic. "
+ "It will also delete all the partitions of the topic if it exists.")
private class DeletePartitionedCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = "--force",
description = "Close all producer/consumer/replicator and delete topic forcefully")
private boolean force = false;
@Override
void run() throws Exception {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().deletePartitionedTopic(persistentTopic, force);
}
}
@Parameters(commandDescription = "Delete a topic. "
+ "The topic cannot be deleted if there's any active subscription or producers connected to it.")
private class DeleteCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = "--force",
description = "Close all producer/consumer/replicator and delete topic forcefully")
private boolean force = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().delete(persistentTopic, force);
}
}
@Parameters(commandDescription = "Unload a topic.")
private class UnloadCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().unload(persistentTopic);
}
}
@Parameters(commandDescription = "Truncate a topic. \n\t\tThe truncate operation will move all cursors to the end "
+ "of the topic and delete all inactive ledgers. ")
private class TruncateCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getPersistentTopics().truncate(topic);
}
}
@Parameters(commandDescription = "Get the list of subscriptions on the topic")
private class ListSubscriptions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws Exception {
String persistentTopic = validatePersistentTopic(params);
print(getPersistentTopics().getSubscriptions(persistentTopic));
}
}
@Parameters(commandDescription = "Delete a durable subscriber from a topic. "
+ "The subscription cannot be deleted if there are any active consumers attached to it")
private class DeleteSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-f",
"--force" }, description = "Disconnect and close all consumers and delete subscription forcefully")
private boolean force = false;
@Parameter(names = { "-s", "--subscription" }, description = "Subscription to be deleted", required = true)
private String subName;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().deleteSubscription(persistentTopic, subName, force);
}
}
@Parameters(commandDescription = "Get the stats for the topic and its connected producers and consumers. "
+ "All the rates are computed over a 1 minute window and are relative the last completed 1 minute period.")
private class GetStats extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-gpb", "--get-precise-backlog" }, description = "Set true to get precise backlog")
private boolean getPreciseBacklog = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(persistentTopics.getStats(persistentTopic, getPreciseBacklog));
}
}
@Parameters(commandDescription = "Get the internal stats for the topic")
private class GetInternalStats extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-m",
"--metadata" }, description = "Flag to include ledger metadata")
private boolean metadata = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getPersistentTopics().getInternalStats(persistentTopic, metadata));
}
}
@Parameters(commandDescription = "Get the internal metadata info for the topic")
private class GetInternalInfo extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
String result = getPersistentTopics().getInternalInfo(persistentTopic);
Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(result));
}
}
@Parameters(commandDescription = "Get the stats for the partitioned topic and "
+ "its connected producers and consumers. All the rates are computed over a 1 minute window and "
+ "are relative the last completed 1 minute period.")
private class GetPartitionedStats extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = "--per-partition", description = "Get per partition stats")
private boolean perPartition = false;
@Override
void run() throws Exception {
String persistentTopic = validatePersistentTopic(params);
print(getPersistentTopics().getPartitionedStats(persistentTopic, perPartition));
}
}
@Parameters(commandDescription = "Get the stats-internal for the partitioned topic and "
+ "its connected producers and consumers. All the rates are computed over a 1 minute window and "
+ "are relative the last completed 1 minute period.")
private class GetPartitionedStatsInternal extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws Exception {
String persistentTopic = validatePersistentTopic(params);
print(getPersistentTopics().getPartitionedInternalStats(persistentTopic));
}
}
@Parameters(commandDescription = "Skip all the messages for the subscription")
private class SkipAll extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s", "--subscription" }, description = "Subscription to be cleared", required = true)
private String subName;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().skipAllMessages(persistentTopic, subName);
}
}
@Parameters(commandDescription = "Skip some messages for the subscription")
private class Skip extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to be skip messages on", required = true)
private String subName;
@Parameter(names = { "-n", "--count" }, description = "Number of messages to skip", required = true)
private long numMessages;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().skipMessages(persistentTopic, subName, numMessages);
}
}
@Parameters(commandDescription = "Expire messages that older than given expiry time (in seconds) "
+ "for the subscription")
private class ExpireMessages extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to be skip messages on", required = true)
private String subName;
@Parameter(names = { "-t", "--expireTime" },
description = "Expire messages older than time in seconds", required = true)
private long expireTimeInSeconds;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().expireMessages(persistentTopic, subName, expireTimeInSeconds);
}
}
@Parameters(commandDescription = "Expire messages that older than given expiry time (in seconds) "
+ "for all subscriptions")
private class ExpireMessagesForAllSubscriptions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-t", "--expireTime" },
description = "Expire messages older than time in seconds", required = true)
private long expireTimeInSeconds;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().expireMessagesForAllSubscriptions(persistentTopic, expireTimeInSeconds);
}
}
@Parameters(commandDescription = "Create a new subscription on a topic")
private class CreateSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to reset position on", required = true)
private String subscriptionName;
@Parameter(names = { "--messageId",
"-m" }, description = "messageId where to create the subscription. "
+ "It can be either 'latest', 'earliest' or (ledgerId:entryId)", required = false)
private String messageIdStr = "latest";
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
MessageId messageId;
if (messageIdStr.equals("latest")) {
messageId = MessageId.latest;
} else if (messageIdStr.equals("earliest")) {
messageId = MessageId.earliest;
} else {
messageId = validateMessageIdString(messageIdStr);
}
getPersistentTopics().createSubscription(persistentTopic, subscriptionName, messageId);
}
}
@Parameters(commandDescription = "Reset position for subscription to position closest to timestamp or messageId")
private class ResetCursor extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to reset position on", required = true)
private String subName;
@Parameter(names = { "--time",
"-t" }, description = "time in minutes to reset back to "
+ "(or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = false)
private String resetTimeStr;
@Parameter(names = { "--messageId",
"-m" }, description = "messageId to reset back to (ledgerId:entryId)", required = false)
private String resetMessageIdStr;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
if (isNotBlank(resetMessageIdStr)) {
MessageId messageId = validateMessageIdString(resetMessageIdStr);
getPersistentTopics().resetCursor(persistentTopic, subName, messageId);
} else if (isNotBlank(resetTimeStr)) {
long resetTimeInMillis;
try {
resetTimeInMillis = TimeUnit.SECONDS.toMillis(
RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
// now - go back time
long timestamp = System.currentTimeMillis() - resetTimeInMillis;
getPersistentTopics().resetCursor(persistentTopic, subName, timestamp);
} else {
throw new PulsarAdminException(
"Either Timestamp (--time) or Position (--position) has to be provided to reset cursor");
}
}
}
@Parameters(commandDescription = "Terminate a topic and don't allow any more messages to be published")
private class Terminate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
try {
MessageId lastMessageId = getPersistentTopics().terminateTopicAsync(persistentTopic).get();
System.out.println("Topic successfully terminated at " + lastMessageId);
} catch (InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
}
}
}
@Parameters(commandDescription = "Peek some messages for the subscription")
private class PeekMessages extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-s",
"--subscription" }, description = "Subscription to get messages from", required = true)
private String subName;
@Parameter(names = { "-n", "--count" }, description = "Number of messages (default 1)", required = false)
private int numMessages = 1;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
List<Message<byte[]>> messages = getPersistentTopics().peekMessages(persistentTopic, subName, numMessages);
int position = 0;
for (Message<byte[]> msg : messages) {
if (++position != 1) {
System.out.println("-------------------------------------------------------------------------\n");
}
if (msg.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
+ msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}
if (msg.getProperties().size() > 0) {
System.out.println("Properties:");
print(msg.getProperties());
}
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
System.out.println(ByteBufUtil.prettyHexDump(data));
}
}
}
@Parameters(commandDescription = "Get message by its ledgerId and entryId")
private class GetMessageById extends CliCommand {
@Parameter(description = "persistent://property/cluster/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-l", "--ledgerId" },
description = "ledger id pointing to the desired ledger",
required = true)
private long ledgerId;
@Parameter(names = { "-e", "--entryId" },
description = "entry id pointing to the desired entry",
required = true)
private long entryId;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
Message<byte[]> message = getPersistentTopics().getMessageById(persistentTopic, ledgerId, entryId);
ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
}
}
@Parameters(commandDescription = "Get last message Id of the topic")
private class GetLastMessageId extends CliCommand {
@Parameter(description = "persistent://property/cluster/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
MessageId messageId = getPersistentTopics().getLastMessageId(persistentTopic);
print(messageId);
}
}
@Parameters(commandDescription = "Compact a topic")
private class Compact extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().triggerCompaction(persistentTopic);
System.out.println("Topic compaction requested for " + persistentTopic);
}
}
@Parameters(commandDescription = "Status of compaction on a topic")
private class CompactionStatusCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "-w", "--wait-complete" },
description = "Wait for compaction to complete", required = false)
private boolean wait = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
try {
LongRunningProcessStatus status = getPersistentTopics().compactionStatus(persistentTopic);
while (wait && status.status == LongRunningProcessStatus.Status.RUNNING) {
Thread.sleep(1000);
status = getPersistentTopics().compactionStatus(persistentTopic);
}
switch (status.status) {
case NOT_RUN:
System.out.println("Compaction has not been run for " + persistentTopic
+ " since broker startup");
break;
case RUNNING:
System.out.println("Compaction is currently running");
break;
case SUCCESS:
System.out.println("Compaction was a success");
break;
case ERROR:
System.out.println("Error in compaction");
throw new PulsarAdminException("Error compacting: " + status.lastError);
}
} catch (InterruptedException e) {
throw new PulsarAdminException(e);
}
}
}
}