blob: a13884a21e63c0cfce9d8e8bb7ae8adb67bfb22d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import org.apache.pulsar.common.api.proto.PulsarApi;
import static org.apache.pulsar.common.util.Codec.decode;
import com.github.zafarkhaja.semver.Version;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*/
public class PersistentTopicsBase extends AdminResource {
private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class);
public static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);
protected List<String> internalGetList() {
validateAdminAccessForTenant(namespaceName.getTenant());
// Validate that namespace exists, throws 404 if it doesn't exist
try {
policiesCache().get(path(POLICIES, namespaceName.toString()));
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to get topic list {}: Namespace does not exist", clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (Exception e) {
log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
List<String> topics = Lists.newArrayList();
try {
String path = String.format("/managed-ledgers/%s/%s", namespaceName.toString(), domain());
for (String topic : managedLedgerListCache().get(path)) {
if (domain().equals(TopicDomain.persistent.toString())) {
topics.add(TopicName.get(domain(), namespaceName, decode(topic)).toString());
}
}
} catch (KeeperException.NoNodeException e) {
// NoNode means there are no topics in this domain for this namespace
} catch (Exception e) {
log.error("[{}] Failed to get topics list for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
topics.sort(null);
return topics;
}
protected List<String> internalGetPartitionedTopicList() {
validateAdminAccessForTenant(namespaceName.getTenant());
// Validate that namespace exists, throws 404 if it doesn't exist
try {
policiesCache().get(path(POLICIES, namespaceName.toString()));
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
return getPartitionedTopicList(TopicDomain.getEnum(domain()));
}
protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
String topicUri = topicName.toString();
try {
Policies policies = policiesCache().get(path(POLICIES, namespaceName.toString()))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
Map<String, Set<AuthAction>> permissions = Maps.newTreeMap();
AuthPolicies auth = policies.auth_policies;
// First add namespace level permissions
for (String role : auth.namespace_auth.keySet()) {
permissions.put(role, auth.namespace_auth.get(role));
}
// Then add topic level permissions
if (auth.destination_auth.containsKey(topicUri)) {
for (Map.Entry<String, Set<AuthAction>> entry : auth.destination_auth.get(topicUri).entrySet()) {
String role = entry.getKey();
Set<AuthAction> topicPermissions = entry.getValue();
if (!permissions.containsKey(role)) {
permissions.put(role, topicPermissions);
} else {
// Do the union between namespace and topic level
Set<AuthAction> union = Sets.union(permissions.get(role), topicPermissions);
permissions.put(role, union);
}
}
}
return permissions;
} catch (Exception e) {
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
throw new RestException(e);
}
}
protected void validateAdminAndClientPermission() {
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception ve) {
try {
checkAuthorization(pulsar(), topicName, clientAppId(), clientAuthData());
} catch (RestException re) {
throw re;
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}",
topicName, clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
}
}
public void validateAdminOperationOnTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
}
protected void validateAdminAccessForSubscriber(String subscriptionName, boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId());
}
validateAdminAccessForSubscriber(subscriptionName);
}
}
private void validateAdminAccessForSubscriber(String subscriptionName) {
try {
if (!pulsar().getBrokerService().getAuthorizationService().canConsume(topicName, clientAppId(),
clientAuthData(), subscriptionName)) {
log.warn("[{}} Subscriber {} is not authorized to access api", topicName, clientAppId());
throw new RestException(Status.UNAUTHORIZED,
String.format("Subscriber %s is not authorized to access this operation", clientAppId()));
}
} catch (RestException re) {
throw re;
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", topicName,
clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
}
protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
String topicUri = topicName.toString();
try {
Stat nodeStat = new Stat();
byte[] content = globalZk().getData(path(POLICIES, namespaceName.toString()), null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
if (!policies.auth_policies.destination_auth.containsKey(topicUri)) {
policies.auth_policies.destination_auth.put(topicUri, new TreeMap<String, Set<AuthAction>>());
}
policies.auth_policies.destination_auth.get(topicUri).put(role, actions);
// Write the new policies to zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policies),
nodeStat.getVersion());
// invalidate the local cache to force update
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
topicUri);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to grant permissions on topic {}: Namespace does not exist", clientAppId(),
topicUri);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to grant permissions on topic {}: concurrent modification", clientAppId(),
topicUri);
throw new RestException(Status.CONFLICT, "Concurrent modification");
}
catch (Exception e) {
log.error("[{}] Failed to grant permissions for topic {}", clientAppId(), topicUri, e);
throw new RestException(e);
}
}
protected void internalDeleteTopicForcefully(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
try {
topic.deleteForcefully().get();
} catch (Exception e) {
log.error("[{}] Failed to delete topic forcefully {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}
protected void internalRevokePermissionsOnTopic(String role) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
String topicUri = topicName.toString();
Stat nodeStat = new Stat();
Policies policies;
try {
byte[] content = globalZk().getData(path(POLICIES, namespaceName.toString()), null, nodeStat);
policies = jsonMapper().readValue(content, Policies.class);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to revoke permissions on topic {}: Namespace does not exist", clientAppId(),
topicUri);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to revoke permissions on topic {}: concurrent modification", clientAppId(),
topicUri);
throw new RestException(Status.CONFLICT, "Concurrent modification");
}
catch (Exception e) {
log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
throw new RestException(e);
}
if (!policies.auth_policies.destination_auth.containsKey(topicUri)
|| !policies.auth_policies.destination_auth.get(topicUri).containsKey(role)) {
log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level",
clientAppId(), role, topicUri);
throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
}
policies.auth_policies.destination_auth.get(topicUri).remove(role);
try {
// Write the new policies to zookeeper
String namespacePath = path(POLICIES, namespaceName.toString());
globalZk().setData(namespacePath, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
// invalidate the local cache to force update
policiesCache().invalidate(namespacePath);
globalZkCache().invalidate(namespacePath);
log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
topicUri);
} catch (Exception e) {
log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
throw new RestException(e);
}
}
protected void internalCreatePartitionedTopic(int numPartitions) {
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "This topic already exists");
}
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
topicName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}
protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validateTopicOwnership(topicName, authoritative);
try {
Topic createdTopic = getOrCreateTopic(topicName);
log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), createdTopic);
} catch (Exception e) {
log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}
/**
* It updates number of partitions of an existing partitioned topic. It requires partitioned-topic to
* already exist and number of new partitions must be greater than existing number of partitions. Decrementing
* number of partitions requires deletion of topic which is not supported.
*
* Already created partitioned producers and consumers can't see newly created partitions and it requires to
* recreate them at application so, newly created producers and consumers can connect to newly added partitions as
* well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.
*
* @param numPartitions
*/
protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) {
validateAdminAccessForTenant(topicName.getTenant());
if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
if (!clusters.contains(pulsar().getConfig().getClusterName())) {
log.error("[{}] local cluster is not part of replicated cluster for namespace {}", clientAppId(),
topicName);
throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
}
try {
createSubscriptions(topicName, numPartitions).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
}
log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
// if this cluster is the first hop which needs to coordinate with other clusters then update partitions in
// other clusters and then update number of partitions.
if (!updateLocalTopicOnly) {
CompletableFuture<Void> updatePartition = new CompletableFuture<>();
final String path = ZkAdminPaths.partitionedTopicPath(topicName);
updatePartitionInOtherCluster(numPartitions, clusters).thenAccept((res) -> {
try {
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
globalZk().setData(path, data, -1, (rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
updatePartition.complete(null);
} else {
updatePartition.completeExceptionally(KeeperException
.create(KeeperException.Code.get(rc), "failed to create update partitions"));
}
}, null);
} catch (Exception e) {
updatePartition.completeExceptionally(e);
}
}).exceptionally(ex -> {
updatePartition.completeExceptionally(ex);
return null;
});
try {
updatePartition.get();
} catch (Exception e) {
log.error("{} Failed to update number of partitions in zk for topic {} and partitions {}",
clientAppId(), topicName, numPartitions, e);
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
}
throw new RestException(e);
}
}
return;
}
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
updatePartitionedTopic(topicName, numPartitions).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
}
log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}
private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() -1);
clusters.forEach(cluster -> {
if (cluster.equals(pulsar().getConfig().getClusterName())) {
return;
}
results.add(pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()
.updatePartitionedTopicAsync(topicName.toString(), numPartitions, true));
});
return FutureUtil.waitForAll(results);
}
protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative, boolean checkAllowAutoCreation) {
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
if (metadata.partitions > 1) {
validateClientVersion();
}
return metadata;
}
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
validateAdminAccessForTenant(topicName.getTenant());
final CompletableFuture<Void> future = new CompletableFuture<>();
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final AtomicInteger count = new AtomicInteger(numPartitions);
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force)
.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof NotFoundException) {
// if the sub-topic is not found, the client might not have called create
// producer or it might have been deleted earlier, so we ignore the 404 error.
// For all other exception, we fail the delete partition method even if a single
// partition is failed to be deleted
if (log.isDebugEnabled()) {
log.debug("[{}] Partition not found: {}", clientAppId(),
topicNamePartition);
}
} else {
log.error("[{}] Failed to delete partition {}", clientAppId(),
topicNamePartition, ex);
future.completeExceptionally(ex);
return;
}
} else {
log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
}
if (count.decrementAndGet() == 0) {
future.complete(null);
}
});
} catch (Exception e) {
log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
future.completeExceptionally(e);
}
}
} else {
future.complete(null);
}
future.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof PreconditionFailedException) {
asyncResponse.resume(
new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions"));
return;
} else if (ex instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) ex));
return;
} else {
asyncResponse.resume(new RestException(ex));
return;
}
}
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
topicName.getEncodedLocalName());
try {
globalZk().delete(path, -1);
globalZkCache().invalidate(path);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
return;
} catch (KeeperException.NoNodeException nne) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned topic does not exist"));
return;
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to delete partitioned topic {}: concurrent modification", clientAppId(),
topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
return;
} catch (Exception e) {
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
asyncResponse.resume(new RestException(e));
return;
}
});
}
protected void internalUnloadTopic(boolean authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
unloadTopic(topicName, authoritative);
}
protected void internalDeleteTopic(boolean authoritative, boolean force) {
if (force) {
internalDeleteTopicForcefully(authoritative);
} else {
internalDeleteTopic(authoritative);
}
}
protected void internalDeleteTopic(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
// v2 topics have a global name so check if the topic is replicated.
if (topic.isReplicated()) {
// Delete is disallowed on global topic
final List<String> clusters = topic.getReplicators().keys();
log.error("[{}] Delete forbidden topic {} is replicated on clusters {}",
clientAppId(), topicName, clusters);
throw new RestException(Status.FORBIDDEN, "Delete forbidden topic is replicated on clusters " + clusters);
}
try {
topic.delete().get();
log.info("[{}] Successfully removed topic {}", clientAppId(), topicName);
} catch (Exception e) {
Throwable t = e.getCause();
log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t);
if (t instanceof TopicBusyException) {
throw new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
} else {
throw new RestException(t);
}
}
}
protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
final List<String> subscriptions = Lists.newArrayList();
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
try {
// get the subscriptions only from the 1st partition since all the other partitions will have the same
// subscriptions
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
.whenComplete((r, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
topicName, ex.getMessage());
if (ex instanceof PulsarAdminException) {
PulsarAdminException pae = (PulsarAdminException) ex;
if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Internal topics have not been generated yet"));
return;
} else {
asyncResponse.resume(new RestException(pae));
return;
}
} else {
asyncResponse.resume(new RestException(ex));
return;
}
}
subscriptions.addAll(r);
asyncResponse.resume(subscriptions);
return;
});
} catch (Exception e) {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
asyncResponse.resume(e);
return;
}
} else {
validateAdminOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
try {
topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
asyncResponse.resume(subscriptions);
return;
} catch (Exception e) {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
asyncResponse.resume(new RestException(e));
return;
}
}
}
protected TopicStats internalGetStats(boolean authoritative) {
validateAdminAndClientPermission();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validateTopicOwnership(topicName, authoritative);
Topic topic = getTopicReference(topicName);
return topic.getStats();
}
protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative) {
validateAdminAndClientPermission();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validateTopicOwnership(topicName, authoritative);
Topic topic = getTopicReference(topicName);
return topic.getInternalStats();
}
protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse) {
validateAdminAccessForTenant(topicName.getTenant());
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
String managedLedger = topicName.getPersistenceNamingEncoding();
pulsar().getManagedLedgerFactory().asyncGetManagedLedgerInfo(managedLedger, new ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
asyncResponse.resume((StreamingOutput) output -> {
jsonMapper().writer().writeValue(output, info);
});
}
@Override
public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(exception);
}
}, null);
}
protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
boolean perPartition) {
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions == 0) {
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
}
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
topicStatsFutureList
.add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString())));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
CompletableFuture<TopicStats> statFuture = null;
for (int i = 0; i < topicStatsFutureList.size(); i++) {
statFuture = topicStatsFutureList.get(i);
if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
try {
stats.add(statFuture.get());
if (perPartition) {
stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return null;
}
}
}
if (perPartition && stats.partitions.isEmpty()) {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
try {
boolean zkPathExists = zkPathExists(path);
if (zkPathExists) {
stats.partitions.put(topicName.toString(), new TopicStats());
} else {
asyncResponse.resume(
new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
return null;
}
} catch (KeeperException | InterruptedException e) {
asyncResponse.resume(new RestException(e));
return null;
}
}
asyncResponse.resume(stats);
return null;
});
}
protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions == 0) {
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
}
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats(partitionMetadata);
List<CompletableFuture<PersistentTopicInternalStats>> topicStatsFutureList = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
topicStatsFutureList.add(pulsar().getAdminClient().topics()
.getInternalStatsAsync((topicName.getPartition(i).toString())));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
CompletableFuture<PersistentTopicInternalStats> statFuture = null;
for (int i = 0; i < topicStatsFutureList.size(); i++) {
statFuture = topicStatsFutureList.get(i);
if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
try {
stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return null;
}
}
}
asyncResponse.resume(!stats.partitions.isEmpty() ? stats
: new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
return null;
});
}
protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.deleteSubscriptionAsync(topicNamePartition.toString(), subName));
} catch (Exception e) {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return null;
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
return null;
} else {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
validateAdminAccessForSubscriber(subName, authoritative);
Topic topic = getTopicReference(topicName);
try {
Subscription sub = topic.getSubscription(subName);
checkNotNull(sub);
sub.delete().get();
log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
return;
} catch (Exception e) {
Throwable t = e.getCause();
if (e instanceof NullPointerException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
} else if (t instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
return;
} else {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e);
asyncResponse.resume(new RestException(t));
return;
}
}
}
}
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(),
subName));
} catch (Exception e) {
log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicNamePartition, subName, e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return null;
} else {
log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
if (ex != null) {
asyncResponse.resume(new RestException(ex));
log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex);
} else {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName);
}
};
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
repl.clearBacklog().whenComplete(biConsumer);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
sub.clearBacklog().whenComplete(biConsumer);
}
} catch (Exception e) {
if (e instanceof NullPointerException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
} else {
asyncResponse.resume(new RestException(e));
}
}
}
}
protected void internalSkipMessages(String subName, int numMessages, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
}
validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
repl.skipMessages(numMessages).get();
} else {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
sub.skipMessages(numMessages).get();
}
log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages, topicName, subName);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
} catch (Exception exception) {
log.error("[{}] Failed to skip {} messages {} {}", clientAppId(), numMessages, topicName, subName,
exception);
throw new RestException(exception);
}
}
protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int expireTimeInSeconds,
boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
// expire messages for each partition topic
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(
topicNamePartition.toString(), expireTimeInSeconds));
} catch (Exception e) {
log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
topicName, t);
asyncResponse.resume(new RestException(t));
return null;
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
// validate ownership and redirect if current broker is not owner
validateAdminOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
final AtomicReference<Throwable> exception = new AtomicReference<>();
topic.getReplicators().forEach((subName, replicator) -> {
try {
internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
} catch (Throwable t) {
exception.set(t);
}
});
topic.getSubscriptions().forEach((subName, subscriber) -> {
try {
internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
} catch (Throwable t) {
exception.set(t);
}
});
if (exception.get() != null) {
if (exception.get() instanceof WebApplicationException) {
WebApplicationException wae = (WebApplicationException) exception.get();
asyncResponse.resume(wae);
return;
} else {
asyncResponse.resume(new RestException(exception.get()));
return;
}
}
asyncResponse.resume(Response.noContent().build());
return;
}
}
protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp,
boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final AtomicInteger count = new AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
final AtomicReference<Throwable> partitionException = new AtomicReference<>();
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
.resetCursorAsync(topicNamePartition.toString(), subName, timestamp).handle((r, ex) -> {
if (ex != null) {
if (ex instanceof PreconditionFailedException) {
// throw the last exception if all partitions get this error
// any other exception on partition is reported back to user
failureCount.incrementAndGet();
partitionException.set(ex);
} else {
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
clientAppId(), topicNamePartition, subName, timestamp, ex);
future.completeExceptionally(ex);
return null;
}
}
if (count.decrementAndGet() == 0) {
future.complete(null);
}
return null;
});
} catch (Exception e) {
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
topicNamePartition, subName, timestamp, e);
future.completeExceptionally(e);
}
}
future.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) ex));
return;
} else {
asyncResponse.resume(new RestException(ex));
return;
}
}
// report an error to user if unable to reset for all partitions
if (failureCount.get() == numPartitions) {
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
subName, timestamp, partitionException.get());
asyncResponse.resume(
new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage()));
return;
} else if (failureCount.get() > 0) {
log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", clientAppId(),
topicName, subName, timestamp, partitionException.get());
}
asyncResponse.resume(Response.noContent().build());
return;
});
} else {
validateAdminAccessForSubscriber(subName, authoritative);
log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
timestamp);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
return;
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
sub.resetCursor(timestamp).get();
log.info("[{}] [{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
timestamp);
asyncResponse.resume(Response.noContent().build());
return;
} catch (Exception e) {
Throwable t = e.getCause();
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
subName, timestamp, e);
if (e instanceof NullPointerException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
} else if (e instanceof NotAllowedException) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
return;
} else if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for timestamp specified -" + t.getMessage()));
return;
} else {
asyncResponse.resume(new RestException(e));
return;
}
}
}
}
protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,
MessageIdImpl messageId, boolean authoritative, boolean replicated) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.earliest : messageId;
log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName,
targetMessageId);
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final AtomicInteger count = new AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
final AtomicReference<Throwable> partitionException = new AtomicReference<>();
// Create the subscription on each partition
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
.createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, targetMessageId)
.handle((r, ex) -> {
if (ex != null) {
// fail the operation on unknown exception or if all the partitioned failed due to
// subscription-already-exist
if (failureCount.incrementAndGet() == numPartitions
|| !(ex instanceof PulsarAdminException.ConflictException)) {
partitionException.set(ex);
}
}
if (count.decrementAndGet() == 0) {
future.complete(null);
}
return null;
});
} catch (Exception e) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(),
topicNamePartition, subscriptionName, targetMessageId, e);
future.completeExceptionally(e);
}
}
future.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) ex));
return;
} else {
asyncResponse.resume(new RestException(ex));
return;
}
}
if (partitionException.get() != null) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId, partitionException.get());
if (partitionException.get() instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) partitionException.get()));
return;
} else {
asyncResponse.resume(new RestException(partitionException.get()));
return;
}
}
asyncResponse.resume(Response.noContent().build());
return;
});
} else {
validateAdminAccessForSubscriber(subscriptionName, authoritative);
PersistentTopic topic = (PersistentTopic) getOrCreateTopic(topicName);
if (topic.getSubscriptions().containsKey(subscriptionName)) {
asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic"));
return;
}
try {
PersistentSubscription subscription = (PersistentSubscription) topic
.createSubscription(subscriptionName, InitialPosition.Latest, replicated).get();
// Mark the cursor as "inactive" as it was created without a real consumer connected
subscription.deactivateCursor();
subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId()))
.get();
} catch (Throwable e) {
Throwable t = e.getCause();
log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId, e);
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage()));
return;
} else {
asyncResponse.resume(new RestException(e));
return;
}
}
log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId);
asyncResponse.resume(Response.noContent().build());
return;
}
}
protected void internalResetCursorOnPosition(String subName, boolean authoritative, MessageIdImpl messageId) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), topicName,
subName, messageId);
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName,
subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Reset-cursor at position is not allowed for partitioned-topic");
} else {
validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
if (topic == null) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
} catch (Exception e) {
Throwable t = e.getCause();
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId, e);
if (e instanceof NullPointerException) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
} else if (t instanceof SubscriptionInvalidCursorPosition) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage());
} else {
throw new RestException(e);
}
}
}
}
protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
}
validateAdminAccessForSubscriber(subName, authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Skip messages on a non-persistent topic is not allowed");
}
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
PersistentReplicator repl = null;
PersistentSubscription sub = null;
Entry entry = null;
if (subName.startsWith(topic.getReplicatorPrefix())) {
repl = getReplicatorReference(subName, topic);
} else {
sub = (PersistentSubscription) getSubscriptionReference(subName, topic);
}
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
entry = repl.peekNthMessage(messagePosition).get();
} else {
entry = sub.peekNthMessage(messagePosition).get();
}
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();
// moves the readerIndex to the payload
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
for (KeyValue keyValue : metadata.getPropertiesList()) {
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
}
// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());
// Copy into a heap buffer for output stream compatibility
ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
uncompressedPayload.readableBytes());
data.writeBytes(uncompressedPayload);
uncompressedPayload.release();
StreamingOutput stream = new StreamingOutput() {
@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
output.write(data.array(), data.arrayOffset(), data.readableBytes());
data.release();
}
};
return responseBuilder.entity(stream).build();
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Message not found");
} catch (Exception exception) {
log.error("[{}] Failed to get message at position {} from {} {}", clientAppId(), messagePosition,
topicName, subName, exception);
throw new RestException(exception);
} finally {
if (entry != null) {
entry.release();
}
}
}
protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
// Validate that namespace exists, throw 404 if it doesn't exist
// note that we do not want to load the topic and hence skip validateAdminOperationOnTopic()
try {
policiesCache().get(path(POLICIES, namespaceName.toString()));
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (Exception e) {
log.error("[{}] Failed to get topic backlog {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
PersistentOfflineTopicStats offlineTopicStats = null;
try {
offlineTopicStats = pulsar().getBrokerService().getOfflineTopicStat(topicName);
if (offlineTopicStats != null) {
// offline topic stat has a cost - so use cached value until TTL
long elapsedMs = System.currentTimeMillis() - offlineTopicStats.statGeneratedAt.getTime();
if (TimeUnit.MINUTES.convert(elapsedMs, TimeUnit.MILLISECONDS) < OFFLINE_TOPIC_STAT_TTL_MINS) {
return offlineTopicStats;
}
}
final ManagedLedgerConfig config = pulsar().getBrokerService().getManagedLedgerConfig(topicName)
.get();
ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(config.getDigestType(),
config.getPassword(), pulsar().getAdvertisedAddress(), false);
offlineTopicStats = offlineTopicBacklog.estimateUnloadedTopicBacklog(
(ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(), topicName);
pulsar().getBrokerService().cacheOfflineTopicStats(topicName, offlineTopicStats);
} catch (Exception exception) {
throw new RestException(exception);
}
return offlineTopicStats;
}
protected MessageId internalTerminate(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
}
validateAdminOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
try {
return ((PersistentTopic) topic).terminate().get();
} catch (Exception exception) {
log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, exception);
throw new RestException(exception);
}
}
protected void internalExpireMessages(AsyncResponse asyncResponse, String subName, int expireTimeInSeconds,
boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
// expire messages for each partition topic
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().expireMessagesAsync(topicNamePartition.toString(),
subName, expireTimeInSeconds));
} catch (Exception e) {
log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return null;
} else {
log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
topicName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
try {
internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
return;
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
}
asyncResponse.resume(Response.noContent().build());
return;
}
}
private void internalExpireMessagesForSinglePartition(String subName, int expireTimeInSeconds,
boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
String msg = "This method should not be called for partitioned topic";
log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
throw new IllegalStateException(msg);
}
// validate ownership and redirect if current broker is not owner
validateAdminAccessForSubscriber(subName, authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Expire messages on a non-persistent topic is not allowed");
}
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
repl.expireMessages(expireTimeInSeconds);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
sub.expireMessages(expireTimeInSeconds);
}
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, topicName,
subName);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
} catch (Exception exception) {
log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", clientAppId(),
expireTimeInSeconds, topicName, subName, exception);
throw new RestException(exception);
}
}
protected void internalTriggerCompaction(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
topic.triggerCompaction();
} catch (AlreadyRunningException e) {
throw new RestException(Status.CONFLICT, e.getMessage());
} catch (Exception e) {
throw new RestException(e);
}
}
protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.compactionStatus();
}
protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) {
validateAdminOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
topic.triggerOffload(messageId);
} catch (AlreadyRunningException e) {
throw new RestException(Status.CONFLICT, e.getMessage());
} catch (Exception e) {
log.warn("Unexpected error triggering offload", e);
throw new RestException(e);
}
}
protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.offloadStatus();
}
public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar,
String clientAppId, String originalPrincipal, AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// (1) authorize client
try {
checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant());
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName.toString(), authException.getMessage()));
}
} catch (Exception ex) {
// throw without wrapping to PulsarClientException that considers: unknown error marked as internal
// server error
log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId,
topicName.toString(), ex.getMessage(), ex);
throw ex;
}
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getNamespace(), topicName.getDomain().toString(),
topicName.getEncodedLocalName());
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
.thenCompose(res -> pulsar.getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex.getCause());
return null;
});
} catch (Exception ex) {
metadataFuture.completeExceptionally(ex);
}
return metadataFuture;
}
/**
* Get the Topic object reference from the Pulsar broker
*/
private Topic getTopicReference(TopicName topicName) {
try {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.get(pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)
.orElseThrow(() -> topicNotFoundReason(topicName));
} catch (RestException e) {
throw e;
} catch (Exception e) {
throw new RestException(e);
}
}
private RestException topicNotFoundReason(TopicName topicName) {
if (!topicName.isPartitioned()) {
return new RestException(Status.NOT_FOUND, "Topic not found");
}
PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(
TopicName.get(topicName.getPartitionedTopicName()), false, false);
if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
final String topicErrorType = partitionedTopicMetadata == null ?
"has no metadata" : "has zero partitions";
return new RestException(Status.NOT_FOUND, String.format(
"Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
} else if (!internalGetList().contains(topicName.toString())) {
return new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
}
return new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
}
private Topic getOrCreateTopic(TopicName topicName) {
return pulsar().getBrokerService().getTopic(topicName.toString(), true).thenApply(Optional::get).join();
}
/**
* Get the Subscription object reference from the Topic reference
*/
private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
try {
Subscription sub = topic.getSubscription(subName);
return checkNotNull(sub);
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
}
}
/**
* Get the Replicator object reference from the Topic reference
*/
private PersistentReplicator getReplicatorReference(String replName, PersistentTopic topic) {
try {
String remoteCluster = PersistentReplicator.getRemoteCluster(replName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
return checkNotNull(repl);
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, "Replicator not found");
}
}
private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions) {
final String path = ZkAdminPaths.partitionedTopicPath(topicName);
CompletableFuture<Void> updatePartition = new CompletableFuture<>();
createSubscriptions(topicName, numPartitions).thenAccept(res -> {
try {
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
globalZk().setData(path, data, -1, (rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
updatePartition.complete(null);
} else {
updatePartition.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc),
"failed to create update partitions"));
}
}, null);
} catch (Exception e) {
updatePartition.completeExceptionally(e);
}
}).exceptionally(ex -> {
updatePartition.completeExceptionally(ex);
return null;
});
return updatePartition;
}
/**
* It creates subscriptions for new partitions of existing partitioned-topics
*
* @param topicName
* : topic-name: persistent://prop/cluster/ns/topic
* @param numPartitions
* : number partitions for the topics
*/
private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getPersistenceNamingEncoding());
CompletableFuture<Void> result = new CompletableFuture<>();
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions <= 1) {
result.completeExceptionally(new RestException(Status.CONFLICT, "Topic is not partitioned topic"));
return;
}
if (partitionMetadata.partitions >= numPartitions) {
result.completeExceptionally(new RestException(Status.CONFLICT,
"number of partitions must be more than existing " + partitionMetadata.partitions));
return;
}
PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException e1) {
result.completeExceptionally(e1);
return;
}
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> {
stats.subscriptions.keySet().forEach(subscription -> {
List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest));
}
FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
result.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex);
result.completeExceptionally(ex);
return null;
});
});
}).exceptionally(ex -> {
if (ex.getCause() instanceof PulsarAdminException.NotFoundException) {
// The first partition doesn't exist, so there are currently to subscriptions to recreate
result.complete(null);
} else {
log.warn("[{}] Failed to get list of subscriptions of {}", clientAppId(), topicName.getPartition(0), ex);
result.completeExceptionally(ex);
}
return null;
});
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partition metadata for {}", clientAppId(), topicName.toString());
result.completeExceptionally(ex);
return null;
});
return result;
}
protected void unloadTopic(TopicName topicName, boolean authoritative) {
validateSuperUserAccess();
validateTopicOwnership(topicName, authoritative);
try {
Topic topic = getTopicReference(topicName);
topic.close().get();
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
} catch (NullPointerException e) {
log.error("[{}] topic {} not found", clientAppId(), topicName);
throw new RestException(Status.NOT_FOUND, "Topic does not exist");
} catch (Exception e) {
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, e.getMessage(), e);
throw new RestException(e);
}
}
// as described at : (PR: #836) CPP-client old client lib should not be allowed to connect on partitioned-topic.
// So, all requests from old-cpp-client (< v1.21) must be rejected.
// Pulsar client-java lib always passes user-agent as X-Java-$version.
// However, cpp-client older than v1.20 (PR #765) never used to pass it.
// So, request without user-agent and Pulsar-CPP-vX (X < 1.21) must be rejected
private void validateClientVersion() {
if (!pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) {
return;
}
final String userAgent = httpRequest.getHeader("User-Agent");
if (StringUtils.isBlank(userAgent)) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Client lib is not compatible to access partitioned metadata: version in user-agent is not present");
}
// Version < 1.20 for cpp-client is not allowed
if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
try {
// Version < 1.20 for cpp-client is not allowed
String[] tokens = userAgent.split(DEPRECATED_CLIENT_VERSION_PREFIX);
String[] splits = tokens.length > 1 ? tokens[1].split("-")[0].trim().split("\\.") : null;
if (splits != null && splits.length > 1) {
if (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > Integer.parseInt(splits[0])
|| LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() > Integer.parseInt(splits[1])) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Client lib is not compatible to access partitioned metadata: version " + userAgent
+ " is not supported");
}
}
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.warn("[{}] Failed to parse version {} ", clientAppId(), userAgent);
}
}
return;
}
protected MessageId internalGetLastMessageId(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"GetLastMessageId on a non-persistent topic is not allowed");
}
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
Position position = topic.getLastMessageId();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
MessageId messageId = new MessageIdImpl(((PositionImpl)position).getLedgerId(), ((PositionImpl)position).getEntryId(), partitionIndex);
return messageId;
}
}