blob: 26b208e219366a1763db1f962281fd1ceee41a2c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.zafarkhaja.semver.Version;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.EncryptionKeys;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.policies.data.stats.PartitionedTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*/
public class PersistentTopicsBase extends AdminResource {
private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class);
private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);
protected List<String> internalGetList(Optional<String> bundle) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
try {
if (!namespaceResources().namespaceExists(namespaceName)) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
try {
List<String> topics = topicResources().listPersistentTopicsAsync(namespaceName).join();
return topics.stream().filter(topic -> {
if (isTransactionInternalName(TopicName.get(topic))) {
return false;
}
if (bundle.isPresent()) {
NamespaceBundle b = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundle(TopicName.get(topic));
return b != null && bundle.get().equals(b.getBundleRange());
}
return true;
}).collect(Collectors.toList());
} catch (Exception e) {
log.error("[{}] Failed to get topics list for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle) {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
.thenAccept(exists -> {
if (!exists) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
})
.thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
.thenApply(topics ->
topics.stream()
.filter(topic -> {
if (isTransactionInternalName(TopicName.get(topic))) {
return false;
}
if (bundle.isPresent()) {
NamespaceBundle b = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundle(TopicName.get(topic));
return b != null && bundle.get().equals(b.getBundleRange());
}
return true;
})
.collect(Collectors.toList())
);
}
protected CompletableFuture<List<String>> internalGetListAsync() {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
.thenAccept(exists -> {
if (!exists) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
})
.thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
.thenApply(topics -> topics.stream().filter(topic ->
!isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList()));
}
protected CompletableFuture<List<String>> internalGetPartitionedTopicListAsync() {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
.thenCompose(namespaceExists -> {
// Validate that namespace exists, throws 404 if it doesn't exist
if (!namespaceExists) {
log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} else {
return getPartitionedTopicListAsync(TopicDomain.getEnum(domain()));
}
});
}
protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
return validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
if (!policies.isPresent()) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
Map<String, Set<AuthAction>> permissions = Maps.newHashMap();
String topicUri = topicName.toString();
AuthPolicies auth = policies.get().auth_policies;
// First add namespace level permissions
auth.getNamespaceAuthentication().forEach(permissions::put);
// Then add topic level permissions
if (auth.getTopicAuthentication().containsKey(topicUri)) {
for (Map.Entry<String, Set<AuthAction>> entry :
auth.getTopicAuthentication().get(topicUri).entrySet()) {
String role = entry.getKey();
Set<AuthAction> topicPermissions = entry.getValue();
if (!permissions.containsKey(role)) {
permissions.put(role, topicPermissions);
} else {
// Do the union between namespace and topic level
Set<AuthAction> union = Sets.union(permissions.get(role), topicPermissions);
permissions.put(role, union);
}
}
}
return permissions;
}));
}
protected void validateCreateTopic(TopicName topicName) {
if (isTransactionInternalName(topicName)) {
log.warn("Forbidden to create transaction internal topic: {}", topicName);
throw new RestException(Status.BAD_REQUEST, "Cannot create topic in system topic format!");
}
}
public void validateAdminOperationOnTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
}
private CompletableFuture<Void> grantPermissionsAsync(TopicName topicUri, String role, Set<AuthAction> actions) {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
return authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/)
.thenAccept(__ -> log.info("[{}] Successfully granted access for role {}: {} - topic {}",
clientAppId(), role, actions, topicUri))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
//The IllegalArgumentException and the IllegalStateException were historically thrown by the
// grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
if (realCause instanceof MetadataStoreException.NotFoundException
|| realCause instanceof IllegalArgumentException) {
log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist",
clientAppId(), topicUri, realCause);
throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
} else if (realCause instanceof MetadataStoreException.BadVersionException
|| realCause instanceof IllegalStateException) {
log.warn("[{}] Failed to set permissions for topic {}: {}", clientAppId(), topicUri,
realCause.getMessage(), realCause);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri,
realCause);
throw new RestException(realCause);
}
});
} else {
String msg = "Authorization is not enabled";
return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
}
}
protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenCompose(unused -> grantPermissionsAsync(topicNamePartition, role,
actions));
}
}
return future.thenCompose(unused -> grantPermissionsAsync(topicName, role, actions))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
}))).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
return null;
});
}
private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role) {
return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
policiesOptional -> {
Policies policies = policiesOptional.orElseThrow(() ->
new RestException(Status.NOT_FOUND, "Namespace does not exist"));
if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
|| !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}",
clientAppId(), role, topicUri);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Permissions are not set at the topic level"));
}
// Write the new policies to metadata store
return namespaceResources().setPoliciesAsync(namespaceName, p -> {
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
return p;
}).thenAccept(__ ->
log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
topicUri)
);
}
);
}
protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenComposeAsync(unused ->
revokePermissionsAsync(topicNamePartition.toString(), role));
}
}
return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
}))
).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
return null;
});
}
protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean authoritative,
Map<String, String> properties) {
CompletableFuture<Void> ret = validateNonPartitionTopicNameAsync(topicName.getLocalName());
if (topicName.isGlobal()) {
ret = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName));
}
return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateNamespaceOperationAsync(topicName.getNamespaceObject(),
NamespaceOperation.CREATE_TOPIC))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, false, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
log.warn("[{}] Partitioned topic with the same name already exists {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "This topic already exists");
}
})
.thenCompose(__ -> pulsar().getBrokerService().getTopicIfExists(topicName.toString()))
.thenCompose(existedTopic -> {
if (existedTopic.isPresent()) {
log.error("[{}] Topic {} already exists", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "This topic already exists");
}
return pulsar().getBrokerService().getTopic(topicName.toString(), true, properties);
})
.thenAccept(__ -> log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName));
}
/**
* It updates number of partitions of an existing partitioned topic. It requires partitioned-topic to
* already exist and number of new partitions must be greater than existing number of partitions. Decrementing
* number of partitions requires deletion of topic which is not supported.
*
* Already created partitioned producers and consumers can't see newly created partitions and it requires to
* recreate them at application so, newly created producers and consumers can connect to newly added partitions as
* well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.
*
* @param numPartitions
* @param updateLocalTopicOnly
* @param authoritative
* @param force
*/
protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPartitions,
boolean updateLocalTopicOnly,
boolean authoritative, boolean force) {
if (numPartitions <= 0) {
return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be more than 0"));
}
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
PolicyOperation.WRITE))
.thenCompose(__ -> {
if (!updateLocalTopicOnly && !force) {
return validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
} else {
return CompletableFuture.completedFuture(null);
}
})
.thenCompose(__ -> {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
if (maxPartitions > 0 && numPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
}
// Only do the validation if it's the first hop.
if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
return getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject())
.thenApply(clusters -> {
if (!clusters.contains(pulsar().getConfig().getClusterName())) {
log.error("[{}] local cluster is not part of replicated cluster for namespace {}",
clientAppId(), topicName);
throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate"
+ " cluster list");
}
return clusters;
})
.thenCompose(clusters -> tryCreatePartitionsAsync(numPartitions).thenApply(ignore ->
clusters))
.thenCompose(clusters -> createSubscriptions(topicName, numPartitions).thenApply(ignore ->
clusters))
.thenCompose(clusters -> {
if (!updateLocalTopicOnly) {
return updatePartitionInOtherCluster(numPartitions, clusters)
.thenCompose(v -> namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(topicName, p ->
new PartitionedTopicMetadata(numPartitions,
p.properties)
));
} else {
return CompletableFuture.completedFuture(null);
}
});
} else {
return tryCreatePartitionsAsync(numPartitions)
.thenCompose(ignore -> updatePartitionedTopic(topicName, numPartitions, force));
}
});
}
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
if (metadata != null) {
tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to create partitions for topic {}",
clientAppId(), topicName);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setIsGlobal(isGlobal);
topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
topicPolicies.setDelayedDeliveryTickTimeMillis(
deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() - 1);
clusters.forEach(cluster -> {
if (cluster.equals(pulsar().getConfig().getClusterName())) {
return;
}
CompletableFuture<Void> updatePartitionTopicFuture =
pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
.thenApply(clusterDataOp ->
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterDataOp))
.thenCompose(pulsarAdmin ->
pulsarAdmin.topics().updatePartitionedTopicAsync(
topicName.toString(), numPartitions, true, false));
results.add(updatePartitionTopicFuture);
});
return FutureUtil.waitForAll(results);
}
protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
boolean checkAllowAutoCreation) {
return sync(() -> internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation));
}
protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMetadataAsync(
boolean authoritative,
boolean checkAllowAutoCreation) {
return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation)
.thenCompose(metadata -> {
CompletableFuture<Void> ret;
if (metadata.partitions == 0 && !checkAllowAutoCreation) {
// The topic may be a non-partitioned topic, so check if it exists here.
// However, when checkAllowAutoCreation is true, the client will create the topic if
// it doesn't exist. In this case, `partitions == 0` means the automatically created topic
// is a non-partitioned topic so we shouldn't check if the topic exists.
ret = internalCheckTopicExists(topicName);
} else if (metadata.partitions > 1) {
ret = internalValidateClientVersionAsync();
} else {
ret = CompletableFuture.completedFuture(null);
}
return ret.thenApply(__ -> metadata);
});
}
protected CompletableFuture<Map<String, String>> internalGetPropertiesAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return getPropertiesAsync();
}
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
if (metadata.partitions == 0) {
return getPropertiesAsync();
}
return CompletableFuture.completedFuture(metadata.properties);
});
});
}
private CompletableFuture<Map<String, String>> getPropertiesAsync() {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenApply(opt -> {
if (!opt.isPresent()) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
}
return ((PersistentTopic) opt.get()).getManagedLedger().getProperties();
});
}
protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
return pulsar().getNamespaceService().checkTopicExists(topicName)
.thenAccept(exist -> {
if (!exist) {
throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
}
});
}
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
boolean authoritative,
boolean force) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateNamespaceOperationAsync(topicName.getNamespaceObject(),
NamespaceOperation.DELETE_TOPIC))
.thenCompose(__ -> pulsar().getBrokerService()
.fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(partitionedMeta -> {
final int numPartitions = partitionedMeta.partitions;
if (numPartitions < 1){
return CompletableFuture.completedFuture(null);
}
return internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions)
.thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force));
})
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
).thenCompose(__ -> namespaceResources()
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName))
.thenAccept(__ -> {
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof PreconditionFailedException) {
asyncResponse.resume(
new RestException(Status.PRECONDITION_FAILED,
"Topic has active producers/subscriptions"));
} else if (realCause instanceof WebApplicationException){
asyncResponse.resume(realCause);
} else if (realCause instanceof MetadataStoreException.NotFoundException) {
log.warn("Namespace policies of {} not found", topicName.getNamespaceObject());
asyncResponse.resume(new RestException(
new RestException(Status.NOT_FOUND,
getPartitionedTopicNotFoundErrorMessage(topicName.toString()))));
} else if (realCause instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) realCause));
} else if (realCause instanceof MetadataStoreException.BadVersionException) {
asyncResponse.resume(new RestException(
new RestException(Status.CONFLICT, "Concurrent modification")));
} else {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Fail to Delete partitioned topic {}", clientAppId(), topicName, realCause);
}
asyncResponse.resume(new RestException(realCause));
}
return null;
});
}
private CompletableFuture<Void> internalRemovePartitionsTopicAsync(int numPartitions, boolean force) {
return FutureUtil.waitForAll(IntStream.range(0, numPartitions)
.mapToObj(i -> {
TopicName topicNamePartition = topicName.getPartition(i);
try {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getAdminClient().topics()
.deleteAsync(topicNamePartition.toString(), force)
.whenComplete((r, ex) -> {
if (ex != null) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof NotFoundException){
// if the sub-topic is not found, the client might not have called
// create producer or it might have been deleted earlier,
// so we ignore the 404 error.
// For all other exception,
// we fail the delete partition method even if a single
// partition is failed to be deleted
if (log.isDebugEnabled()) {
log.debug("[{}] Partition not found: {}", clientAppId(),
topicNamePartition);
}
future.complete(null);
} else {
log.error("[{}] Failed to delete partition {}", clientAppId(),
topicNamePartition, realCause);
future.completeExceptionally(realCause);
}
} else {
future.complete(null);
}
});
return future;
} catch (PulsarServerException ex) {
log.error("[{}] Failed to get admin client while delete partition {}",
clientAppId(), topicNamePartition, ex);
return FutureUtil.failedFuture(ex);
}
}).collect(Collectors.toList()));
}
private CompletableFuture<Void> internalRemovePartitionsAuthenticationPoliciesAsync(int numPartitions) {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getPulsarResources().getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), p -> {
IntStream.range(0, numPartitions)
.forEach(i -> p.auth_policies.getTopicAuthentication()
.remove(topicName.getPartition(i).toString()));
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
return p;
})
.whenComplete((r, ex) -> {
if (ex != null){
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof MetadataStoreException.NotFoundException) {
log.warn("Namespace policies of {} not found", topicName.getNamespaceObject());
future.complete(null);
} else {
log.error("Failed to delete authentication policies for partitioned topic {}",
topicName, ex);
future.completeExceptionally(realCause);
}
} else {
log.info("Successfully delete authentication policies for partitioned topic {}", topicName);
future.complete(null);
}
});
return future;
}
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
if (isTransactionCoordinatorAssign(topicName)) {
internalUnloadTransactionCoordinatorAsync(asyncResponse, authoritative);
} else {
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
}
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(meta -> {
if (meta.partitions > 0) {
final List<CompletableFuture<Void>> futures =
Lists.newArrayListWithCapacity(meta.partitions);
for (int i = 0; i < meta.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().unloadAsync(
topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to unload topic {}", clientAppId(),
topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable th = exception.getCause();
if (th instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
} else if (th instanceof WebApplicationException) {
asyncResponse.resume(th);
} else {
log.error("[{}] Failed to unload topic {}", clientAppId(), topicName,
exception);
asyncResponse.resume(new RestException(exception));
}
} else {
asyncResponse.resume(Response.noContent().build());
}
return null;
});
} else {
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get partitioned metadata while unloading topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to validate the global namespace ownership while unloading topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryPolicies(boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> {
TopicPolicies policies = op.orElseGet(TopicPolicies::new);
DelayedDeliveryPolicies delayedDeliveryPolicies = null;
if (policies.isDelayedDeliveryEnabledSet() && policies.isDelayedDeliveryTickTimeMillisSet()) {
delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(policies.getDelayedDeliveryTickTimeMillis())
.active(policies.getDelayedDeliveryEnabled())
.build();
}
if (delayedDeliveryPolicies == null && applied) {
delayedDeliveryPolicies = getNamespacePolicies(namespaceName).delayed_delivery_policies;
if (delayedDeliveryPolicies == null) {
delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis())
.active(pulsar().getConfiguration().isDelayedDeliveryEnabled())
.build();
}
}
return delayedDeliveryPolicies;
});
}
protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> {
OffloadPoliciesImpl offloadPolicies = op.map(TopicPolicies::getOffloadPolicies).orElse(null);
if (applied) {
OffloadPoliciesImpl namespacePolicy =
(OffloadPoliciesImpl) getNamespacePolicies(namespaceName).offload_policies;
offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies
, namespacePolicy, pulsar().getConfiguration().getProperties());
}
return offloadPolicies;
});
}
protected CompletableFuture<Void> internalSetOffloadPolicies
(OffloadPoliciesImpl offloadPolicies, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setOffloadPolicies(offloadPolicies);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies
(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getInactiveTopicPolicies)
.orElseGet(() -> {
if (applied) {
InactiveTopicPolicies policies = getNamespacePolicies(namespaceName).inactive_topic_policies;
return policies == null ? new InactiveTopicPolicies(
config().getBrokerDeleteInactiveTopicsMode(),
config().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
config().isBrokerDeleteInactiveTopicsEnabled()) : policies;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetInactiveTopicPolicies
(InactiveTopicPolicies inactiveTopicPolicies, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setIsGlobal(isGlobal);
topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription(boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnSubscription)
.orElseGet(() -> {
if (applied) {
Integer maxUnackedNum = getNamespacePolicies(namespaceName)
.max_unacked_messages_per_subscription;
return maxUnackedNum == null ? config().getMaxUnackedMessagesPerSubscription() : maxUnackedNum;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum,
boolean isGlobal) {
if (maxUnackedNum != null && maxUnackedNum < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedNum must be 0 or more");
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnConsumer)
.orElseGet(() -> {
if (applied) {
Integer maxUnacked = getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer;
return maxUnacked == null ? config().getMaxUnackedMessagesPerConsumer() : maxUnacked;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum,
boolean isGlobal) {
if (maxUnackedNum != null && maxUnackedNum < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedNum must be 0 or more");
}
return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval, boolean isGlobal) {
if (interval != null && interval < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "interval must be 0 or more");
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies policies = op.orElseGet(TopicPolicies::new);
policies.setDeduplicationSnapshotIntervalSeconds(interval);
policies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, policies);
});
}
private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.close(false))
.thenRun(() -> {
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
.thenCompose(v -> pulsar()
.getTransactionMetadataStoreService()
.removeTransactionMetadataStore(
TransactionCoordinatorID.get(topicName.getPartitionIndex())))
.thenRun(() -> {
log.info("[{}] Successfully unloaded tc {}", clientAppId(),
topicName.getPartitionIndex());
asyncResponse.resume(Response.noContent().build());
}))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload tc {},{}", clientAppId(),
topicName.getPartitionIndex(), ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected CompletableFuture<Void> internalDeleteTopicAsync(boolean authoritative, boolean force) {
return validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC)
.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> pulsar().getBrokerService().deleteTopic(topicName.toString(), force));
}
protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ ->
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS))
.thenAccept(unused1 -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalGetSubscriptionsForNonPartitionedTopic(asyncResponse);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
try {
final Set<String> subscriptions =
Collections.newSetFromMap(
new ConcurrentHashMap<>(partitionMetadata.partitions));
final List<CompletableFuture<Object>> subscriptionFutures = Lists.newArrayList();
if (topicName.getDomain() == TopicDomain.persistent) {
final Map<Integer, CompletableFuture<Boolean>> existsFutures =
new ConcurrentHashMap<>(partitionMetadata.partitions);
for (int i = 0; i < partitionMetadata.partitions; i++) {
existsFutures.put(i,
topicResources().persistentTopicExists(topicName.getPartition(i)));
}
FutureUtil.waitForAll(Lists.newArrayList(existsFutures.values()))
.thenApply(unused2 ->
existsFutures.entrySet().stream().filter(e -> e.getValue().join())
.map(item -> topicName.getPartition(item.getKey()).toString())
.collect(Collectors.toList())
).thenAccept(topics -> {
if (log.isDebugEnabled()) {
log.debug("activeTopics : {}", topics);
}
topics.forEach(topic -> {
try {
CompletableFuture<List<String>> subscriptionsAsync = pulsar()
.getAdminClient()
.topics().getSubscriptionsAsync(topic);
subscriptionFutures.add(subscriptionsAsync
.thenApply(subscriptions::addAll));
} catch (PulsarServerException e) {
throw new RestException(e);
}
});
}).thenAccept(unused3 -> resumeAsyncResponse(asyncResponse,
subscriptions, subscriptionFutures));
} else {
for (int i = 0; i < partitionMetadata.partitions; i++) {
CompletableFuture<List<String>> subscriptionsAsync = pulsar()
.getAdminClient().topics()
.getSubscriptionsAsync(topicName.getPartition(i).toString());
subscriptionFutures.add(subscriptionsAsync
.thenApply(subscriptions::addAll));
}
resumeAsyncResponse(asyncResponse, subscriptions, subscriptionFutures);
}
} catch (Exception e) {
log.error("[{}] Failed to get list of subscriptions for {}",
clientAppId(), topicName, e);
asyncResponse.resume(e);
}
} else {
internalGetSubscriptionsForNonPartitionedTopic(asyncResponse);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get partitioned topic metadata while get"
+ " subscriptions for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to validate the global namespace/topic ownership while get subscriptions"
+ " for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
})
);
}
private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscriptions,
List<CompletableFuture<Object>> subscriptionFutures) {
FutureUtil.waitForAll(subscriptionFutures).whenComplete((r, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
topicName, ex.getMessage());
if (ex instanceof PulsarAdminException) {
PulsarAdminException pae = (PulsarAdminException) ex;
if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Internal topics have not been generated yet"));
return;
} else {
asyncResponse.resume(new RestException(pae));
return;
}
} else {
asyncResponse.resume(new RestException(ex));
return;
}
} else {
asyncResponse.resume(new ArrayList<>(subscriptions));
}
});
}
private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) {
getTopicReferenceAsync(topicName)
.thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys())))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected CompletableFuture<? extends TopicStats> internalGetStatsAsync(boolean authoritative,
boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
return future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenComposeAsync(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.asyncGetStats(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog));
}
protected CompletableFuture<PersistentTopicInternalStats> internalGetInternalStatsAsync(boolean authoritative,
boolean metadata) {
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
ret = CompletableFuture.completedFuture(null);
}
return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> {
if (metadata) {
return validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA);
}
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.getInternalStats(metadata));
}
protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<String>> futures =
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
PartitionedManagedLedgerInfo partitionedManagedLedgerInfo = new PartitionedManagedLedgerInfo();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.getInternalInfoAsync(topicNamePartition.toString())
.whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get managed info for {}",
clientAppId(), topicNamePartition, throwable);
asyncResponse.resume(new RestException(throwable));
}
try {
partitionedManagedLedgerInfo.partitions
.put(topicNamePartition.toString(), jsonMapper()
.readValue(response, ManagedLedgerInfo.class));
} catch (JsonProcessingException ex) {
log.error("[{}] Failed to parse ManagedLedgerInfo for {} from [{}]",
clientAppId(), topicNamePartition, response, ex);
}
})
);
} catch (PulsarServerException e) {
log.error("[{}] Failed to get admin client while get managed info for {}" ,
clientAppId(), topicNamePartition, e);
throw new RestException(e);
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
} else {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, t);
asyncResponse.resume(new RestException(t));
}
}
asyncResponse.resume((StreamingOutput) output -> {
jsonMapper().writer().writeValue(output, partitionedManagedLedgerInfo);
});
return null;
});
} else {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get partitioned metadata while get managed info for {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to validate the global namespace ownership while get managed info for {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalGetManagedLedgerInfoForNonPartitionedTopic(AsyncResponse asyncResponse) {
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
.thenAccept(__ -> {
String managedLedger = topicName.getPersistenceNamingEncoding();
pulsar().getManagedLedgerFactory()
.asyncGetManagedLedgerInfo(managedLedger, new ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
asyncResponse.resume((StreamingOutput) output -> {
jsonMapper().writer().writeValue(output, info);
});
}
@Override
public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(exception);
}
}, null);
}).exceptionally(ex -> {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition,
boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false)).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
return;
}
PartitionedTopicStatsImpl stats = new PartitionedTopicStatsImpl(partitionMetadata);
List<CompletableFuture<TopicStats>> topicStatsFutureList = new ArrayList<>(partitionMetadata.partitions);
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName partition = topicName.getPartition(i);
topicStatsFutureList.add(
pulsar().getNamespaceService()
.isServiceUnitOwnedAsync(partition)
.thenCompose(owned -> {
if (owned) {
return getTopicReferenceAsync(partition)
.thenApply(ref ->
ref.getStats(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog));
} else {
try {
return pulsar().getAdminClient().topics().getStatsAsync(
partition.toString(), getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog);
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
}
})
);
}
FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
CompletableFuture<TopicStats> statFuture = null;
for (int i = 0; i < topicStatsFutureList.size(); i++) {
statFuture = topicStatsFutureList.get(i);
if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
try {
stats.add(statFuture.get());
if (perPartition) {
stats.getPartitions().put(topicName.getPartition(i).toString(), statFuture.get());
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return null;
}
}
}
if (perPartition && stats.partitions.isEmpty()) {
try {
boolean pathExists = namespaceResources().getPartitionedTopicResources()
.partitionedTopicExists(topicName);
if (pathExists) {
stats.partitions.put(topicName.toString(), new TopicStatsImpl());
} else {
asyncResponse.resume(
new RestException(Status.NOT_FOUND,
"Internal topics have not been generated yet"));
return null;
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return null;
}
}
asyncResponse.resume(stats);
return null;
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
return;
}
PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats(partitionMetadata);
List<CompletableFuture<PersistentTopicInternalStats>> topicStatsFutureList = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
topicStatsFutureList.add(pulsar().getAdminClient().topics()
.getInternalStatsAsync((topicName.getPartition(i).toString()), false));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
CompletableFuture<PersistentTopicInternalStats> statFuture = null;
for (int i = 0; i < topicStatsFutureList.size(); i++) {
statFuture = topicStatsFutureList.get(i);
if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
try {
stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return null;
}
}
}
asyncResponse.resume(!stats.partitions.isEmpty() ? stats
: new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
return null;
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalDeleteSubscription(AsyncResponse asyncResponse,
String subName, boolean authoritative, boolean force) {
if (force) {
internalDeleteSubscriptionForcefully(asyncResponse, subName, authoritative);
} else {
internalDeleteSubscription(asyncResponse, subName, authoritative);
}
}
protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAcceptAsync(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
} catch (Exception e) {
log.error("[{}] Failed to delete subscription {} {}",
clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return null;
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
return null;
} else {
log.error("[{}] Failed to delete subscription {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
}
}, pulsar().getExecutor()).exceptionally(ex -> {
log.error("[{}] Failed to delete subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.delete();
}).thenRun(() -> {
log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
if (cause instanceof SubscriptionBusyException) {
log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName,
topicName, cause);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
} else {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
}
return null;
});
}
private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName,
Optional<Position> position,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.analyzeBacklog(position);
})
.thenAccept((AnalyzeBacklogResult rawResult) -> {
AnalyzeSubscriptionBacklogResult result = new AnalyzeSubscriptionBacklogResult();
if (rawResult.getFirstPosition() != null) {
result.setFirstMessageId(
rawResult.getFirstPosition().getLedgerId()
+ ":"
+ rawResult.getFirstPosition().getEntryId());
}
if (rawResult.getLastPosition() != null) {
result.setLastMessageId(rawResult.getLastPosition().getLedgerId()
+ ":"
+ rawResult.getLastPosition().getEntryId());
}
result.setEntries(rawResult.getEntries());
result.setMessages(rawResult.getMessages());
result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries());
result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries());
result.setFilterRescheduledEntries(rawResult.getFilterRescheduledEntries());
result.setFilterAcceptedMessages(rawResult.getFilterAcceptedMessages());
result.setFilterRejectedMessages(rawResult.getFilterRejectedMessages());
result.setFilterRescheduledMessages(rawResult.getFilterRescheduledMessages());
result.setAborted(rawResult.getScanOutcome() != ScanOutcome.COMPLETED);
log.info("[{}] analyzeBacklog topic {} subscription {} result {}", clientAppId(), subName,
topicName, result);
asyncResponse.resume(result);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to analyze subscription backlog {} {}",
clientAppId(), topicName, subName, cause);
}
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}
private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, Map<String, String> subscriptionProperties,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.updateSubscriptionProperties(subscriptionProperties);
}).thenRun(() -> {
log.info("[{}][{}] Updated subscription {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
return null;
});
}
private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenApply((Topic topic) -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.getSubscriptionProperties();
}).thenAccept((Map<String, String> properties) -> {
if (properties == null) {
properties = Collections.emptyMap();
}
asyncResponse.resume(Response.ok(properties).build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
return null;
});
}
protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
} catch (Exception e) {
log.error("[{}] Failed to delete subscription forcefully {} {}",
clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return null;
} else {
log.error("[{}] Failed to delete subscription forcefully {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName,
authoritative);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete subscription forcefully {} from topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.deleteForcefully();
}).thenRun(() -> {
log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete subscription forcefully {} {}",
clientAppId(), topicName, subName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName))
.thenCompose(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
return internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, authoritative);
} else {
return getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenCompose(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar()
.getAdminClient()
.topics()
.skipAllMessagesAsync(topicNamePartition.toString(),
subName));
} catch (Exception e) {
log.error("[{}] Failed to skip all messages {} {}",
clientAppId(), topicNamePartition, subName, e);
asyncResponse.resume(new RestException(e));
return CompletableFuture.completedFuture(null);
}
}
return FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(
new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
} else {
log.error("[{}] Failed to skip all messages {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
}
return null;
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
return internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName,
authoritative);
}
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private CompletableFuture<Void> internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
return getTopicReferenceAsync(topicName).thenCompose(t -> {
PersistentTopic topic = (PersistentTopic) t;
BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
if (ex != null) {
asyncResponse.resume(new RestException(ex));
log.error("[{}] Failed to skip all messages {} {}",
clientAppId(), topicName, subName, ex);
} else {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName);
}
};
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl =
(PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return CompletableFuture.completedFuture(null);
}
return repl.clearBacklog().whenComplete(biConsumer);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return CompletableFuture.completedFuture(null);
}
return sub.clearBacklog().whenComplete(biConsumer);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, int numMessages,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenCompose(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
String msg = "Skip messages on a partitioned topic is not allowed";
log.warn("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED, msg);
}
return getTopicReferenceAsync(topicName).thenCompose(t -> {
PersistentTopic topic = (PersistentTopic) t;
if (topic == null) {
throw new RestException(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
}
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl =
(PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
return FutureUtil.failedFuture(
new RestException(Status.NOT_FOUND, "Replicator not found"));
}
return repl.skipMessages(numMessages).thenAccept(unused -> {
log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages,
topicName, subName);
asyncResponse.resume(Response.noContent().build());
}
);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
return FutureUtil.failedFuture(
new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
}
return sub.skipMessages(numMessages).thenAccept(unused -> {
log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages,
topicName, subName);
asyncResponse.resume(Response.noContent().build());
}
);
}
});
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to skip {} messages {} {}", clientAppId(), numMessages, topicName,
subName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int expireTimeInSeconds,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ ->
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(partitionMetadata -> {
if (topicName.isPartitioned()) {
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
partitionMetadata, expireTimeInSeconds, authoritative);
} else {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures =
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
// expire messages for each partition topic
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar()
.getAdminClient()
.topics()
.expireMessagesForAllSubscriptionsAsync(
topicNamePartition.toString(), expireTimeInSeconds));
} catch (Exception e) {
log.error("[{}] Failed to expire messages up to {} on {}",
clientAppId(), expireTimeInSeconds,
topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
log.error("[{}] Failed to expire messages up to {} on {}",
clientAppId(), expireTimeInSeconds,
topicName, t);
asyncResponse.resume(new RestException(t));
return null;
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
partitionMetadata, expireTimeInSeconds, authoritative);
}
}
}
)
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName,
ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse,
PartitionedTopicMetadata
partitionMetadata,
int expireTimeInSeconds,
boolean authoritative) {
// validate ownership and redirect if current broker is not owner
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> {
if (t == null) {
resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
if (!(t instanceof PersistentTopic)) {
resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.METHOD_NOT_ALLOWED,
"Expire messages for all subscriptions on a non-persistent topic is not allowed"));
return;
}
PersistentTopic topic = (PersistentTopic) t;
final List<CompletableFuture<Void>> futures =
Lists.newArrayListWithCapacity((int) topic.getReplicators().size());
List<String> subNames =
Lists.newArrayListWithCapacity((int) topic.getReplicators().size()
+ (int) topic.getSubscriptions().size());
subNames.addAll(topic.getReplicators().keys());
subNames.addAll(topic.getSubscriptions().keys());
for (int i = 0; i < subNames.size(); i++) {
try {
futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
subNames.get(i), expireTimeInSeconds));
} catch (Exception e) {
log.error("[{}] Failed to expire messages for all subscription up to {} on {}",
clientAppId(), expireTimeInSeconds, topicName, e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable throwable = FutureUtil.unwrapCompletionException(exception);
log.error("[{}] Failed to expire messages for all subscription up to {} on {}",
clientAppId(), expireTimeInSeconds, topicName, throwable);
asyncResponse.resume(new RestException(throwable));
return null;
}
asyncResponse.resume(Response.noContent().build());
return null;
});
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to expire messages for all subscription up to {} on {}", clientAppId(),
expireTimeInSeconds, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected CompletableFuture<Void> internalResetCursorAsync(String subName, long timestamp,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
return future
.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
.thenCompose(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
} else {
return internalResetCursorForPartitionedTopic(subName, timestamp, authoritative);
}
});
}
private CompletableFuture<Void> internalResetCursorForPartitionedTopic(String subName, long timestamp,
boolean authoritative) {
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenCompose(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final AtomicInteger count = new AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
final AtomicReference<Throwable> partitionException = new AtomicReference<>();
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
.resetCursorAsync(topicNamePartition.toString(),
subName, timestamp).handle((r, ex) -> {
if (ex != null) {
if (ex instanceof PreconditionFailedException) {
// throw the last exception if all partitions get this error
// any other exception on partition is reported back to user
failureCount.incrementAndGet();
partitionException.set(ex);
} else {
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
clientAppId(), topicNamePartition, subName, timestamp, ex);
future.completeExceptionally(ex);
return null;
}
}
if (count.decrementAndGet() == 0) {
future.complete(null);
}
return null;
});
} catch (Exception e) {
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
topicNamePartition, subName, timestamp, e);
future.completeExceptionally(e);
}
}
return future.whenComplete((r, ex) -> {
// report an error to user if unable to reset for all partitions
if (failureCount.get() == numPartitions) {
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
clientAppId(), topicName,
subName, timestamp, partitionException.get());
throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
} else if (failureCount.get() > 0) {
log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
clientAppId(), topicName, subName, timestamp, partitionException.get());
}
});
} else {
return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
}
});
}
private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
.thenCompose(__ -> {
log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
clientAppId(), topicName, subName, timestamp);
return getTopicReferenceAsync(topicName);
})
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.resetCursor(timestamp);
})
.thenRun(() ->
log.info("[{}][{}] Reset cursor on subscription {} to time {}",
clientAppId(), topicName, subName, timestamp));
}
protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,
MessageIdImpl messageId, boolean authoritative, boolean replicated, Map<String, String> properties) {
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
ret = CompletableFuture.completedFuture(null);
}
ret.thenAccept(__ -> {
final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.latest : messageId;
log.info("[{}][{}] Creating subscription {} at message id {} with properties {}", clientAppId(),
topicName, subscriptionName, targetMessageId, properties);
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated, properties);
} else {
boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final AtomicInteger count = new AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
final AtomicReference<Throwable> partitionException = new AtomicReference<>();
// Create the subscription on each partition
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
.createSubscriptionAsync(topicNamePartition.toString(),
subscriptionName, targetMessageId, false, properties)
.handle((r, ex) -> {
if (ex != null) {
// fail the operation on unknown exception or
// if all the partitioned failed due to
// subscription-already-exist
if (failureCount.incrementAndGet() == numPartitions
|| !(ex instanceof PulsarAdminException.ConflictException)) {
partitionException.set(ex);
}
}
if (count.decrementAndGet() == 0) {
future.complete(null);
}
return null;
});
} catch (Exception e) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(),
topicNamePartition, subscriptionName, targetMessageId, e);
future.completeExceptionally(e);
}
}
future.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) ex));
return;
} else {
asyncResponse.resume(new RestException(ex));
return;
}
}
if (partitionException.get() != null) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}",
clientAppId(), topicName,
subscriptionName, targetMessageId, partitionException.get());
if (partitionException.get() instanceof PulsarAdminException) {
asyncResponse.resume(
new RestException((PulsarAdminException) partitionException.get()));
return;
} else {
asyncResponse.resume(new RestException(partitionException.get()));
return;
}
}
asyncResponse.resume(Response.noContent().build());
});
} else {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated, properties);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to create subscription {} on topic {}",
clientAppId(), subscriptionName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to create subscription {} on topic {}",
clientAppId(), subscriptionName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private void internalCreateSubscriptionForNonPartitionedTopic(
AsyncResponse asyncResponse, String subscriptionName,
MessageIdImpl targetMessageId, boolean authoritative, boolean replicated,
Map<String, String> properties) {
boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName);
return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation);
}).thenApply(optTopic -> {
if (optTopic.isPresent()) {
return optTopic.get();
} else {
throw new RestException(Status.PRECONDITION_FAILED,
"Topic does not exist and cannot be auto-created");
}
}).thenCompose(topic -> {
if (topic.getSubscriptions().containsKey(subscriptionName)) {
throw new RestException(Status.CONFLICT, "Subscription already exists for topic");
}
return topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated, properties);
}).thenCompose(subscription -> {
// Mark the cursor as "inactive" as it was created without a real consumer connected
((PersistentSubscription) subscription).deactivateCursor();
return subscription.resetCursor(
PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId()));
}).thenRun(() -> {
log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(),
topicName, subscriptionName, targetMessageId);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
if (!(t instanceof WebApplicationException)) {
log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId, t);
}
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage()));
} else if (t instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Failed for Subscription Busy: " + t.getMessage()));
} else {
resumeAsyncResponseExceptionally(asyncResponse, t);
}
return null;
});
}
protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName,
Map<String, String> subscriptionProperties,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
subscriptionProperties, authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAcceptAsync(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.updateSubscriptionPropertiesAsync(topicNamePartition.toString(),
subName, subscriptionProperties));
} catch (Exception e) {
log.error("[{}] Failed to update properties for subscription {} {}",
clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return null;
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
return null;
} else {
log.error("[{}] Failed to update properties for subscription {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
subscriptionProperties, authoritative);
}
}, pulsar().getExecutor()).exceptionally(ex -> {
log.error("[{}] Failed to update properties for subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, String subName,
Optional<Position> position,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(metadata -> {
if (metadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Analyze backlog on a partitioned topic is not allowed, "
+ "please try do it on specific topic partition");
}
});
}
})
.thenAccept(__ -> {
internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(asyncResponse, subName,
position, authoritative);
})
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to analyze back log of subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAcceptAsync(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Map<String, String>>> futures = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.getSubscriptionPropertiesAsync(topicNamePartition.toString(),
subName));
} catch (Exception e) {
log.error("[{}] Failed to update properties for subscription {} {}",
clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return null;
} else {
log.error("[{}] Failed to get properties for subscription {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}
Map<String, String> aggregatedResult = new HashMap<>();
futures.forEach(f -> {
// in theory all the partitions have the same properties
try {
aggregatedResult.putAll(f.get());
} catch (Exception impossible) {
// we already waited for this Future
asyncResponse.resume(new RestException(impossible));
}
});
asyncResponse.resume(Response.ok(aggregatedResult).build());
return null;
});
} else {
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
}
}, pulsar().getExecutor()).exceptionally(ex -> {
log.error("[{}] Failed to update properties for subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
ret = CompletableFuture.completedFuture(null);
}
ret.thenAccept(__ -> {
log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), topicName,
subName, messageId);
// If the topic name is a partition name, no need to get partition topic metadata again
if (!topicName.isPartitioned()
&& getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName,
subName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Reset-cursor at position is not allowed for partitioned-topic"));
return;
} else {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(ignore ->
validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
.thenCompose(ignore -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
PersistentSubscription sub = ((PersistentTopic) topic).getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
getEntryBatchSize(batchSizeFuture, (PersistentTopic) topic, messageId, batchIndex);
batchSizeFuture.thenAccept(bi -> {
PositionImpl seekPosition = calculatePositionAckSet(isExcluded, bi, batchIndex,
messageId);
sub.resetCursor(seekPosition).thenRun(() -> {
log.info("[{}][{}] successfully reset cursor on subscription {}"
+ " to position {}", clientAppId(),
topicName, subName, messageId);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
log.warn("[{}][{}] Failed to reset cursor on subscription {}"
+ " to position {}", clientAppId(),
topicName, subName, messageId, t);
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: "
+ t.getMessage()));
} else if (t instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Failed for Subscription Busy: " + t.getMessage()));
} else {
resumeAsyncResponseExceptionally(asyncResponse, t);
}
return null;
});
}).exceptionally(e -> {
asyncResponse.resume(e);
return null;
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}",
clientAppId(), topicName, subName, messageId, ex.getCause());
}
resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}",
clientAppId(), topicName, subName, messageId, ex.getCause());
}
resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
return null;
});
}
private void getEntryBatchSize(CompletableFuture<Integer> batchSizeFuture, PersistentTopic topic,
MessageIdImpl messageId, int batchIndex) {
if (batchIndex >= 0) {
try {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
// Since we can't read the message from the storage layer,
// it might be an already delete message ID or an invalid message ID
// We should fall back to non batch index seek.
batchSizeFuture.complete(0);
}
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
try {
if (entry == null) {
batchSizeFuture.complete(0);
} else {
MessageMetadata metadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
batchSizeFuture.complete(metadata.getNumMessagesInBatch());
}
} catch (Exception e) {
batchSizeFuture.completeExceptionally(new RestException(e));
}
} finally {
if (entry != null) {
entry.release();
}
}
}
}, null);
} catch (NullPointerException npe) {
batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found"));
} catch (Exception exception) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
clientAppId(), messageId.getLedgerId(), messageId.getEntryId(), topicName, exception);
batchSizeFuture.completeExceptionally(new RestException(exception));
}
} else {
batchSizeFuture.complete(0);
}
}
private PositionImpl calculatePositionAckSet(boolean isExcluded, int batchSize,
int batchIndex, MessageIdImpl messageId) {
PositionImpl seekPosition;
if (batchSize > 0) {
long[] ackSet;
BitSetRecyclable bitSet = BitSetRecyclable.create();
bitSet.set(0, batchSize);
if (isExcluded) {
bitSet.clear(0, Math.max(batchIndex + 1, 0));
if (bitSet.length() > 0) {
ackSet = bitSet.toLongArray();
seekPosition = PositionImpl.get(messageId.getLedgerId(),
messageId.getEntryId(), ackSet);
} else {
seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
seekPosition = seekPosition.getNext();
}
} else {
if (batchIndex - 1 >= 0) {
bitSet.clear(0, batchIndex);
ackSet = bitSet.toLongArray();
seekPosition = PositionImpl.get(messageId.getLedgerId(),
messageId.getEntryId(), ackSet);
} else {
seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
}
}
bitSet.recycle();
} else {
seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
seekPosition = isExcluded ? seekPosition.getNext() : seekPosition;
}
return seekPosition;
}
protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId,
boolean authoritative) {
// will redirect if the topic not owned by current broker
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
.thenCompose(__ -> {
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
ret = CompletableFuture.completedFuture(null);
}
return ret;
})
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
ManagedLedgerImpl ledger =
(ManagedLedgerImpl) ((PersistentTopic) topic).getManagedLedger();
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId),
new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
asyncResponse.resume(new RestException(exception));
}
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
asyncResponse.resume(generateResponseWithEntry(entry));
} catch (IOException exception) {
asyncResponse.resume(new RestException(exception));
} finally {
if (entry != null) {
entry.release();
}
}
}
}, null);
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected CompletableFuture<MessageId> internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
return future.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(metadata -> {
if (metadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a partitioned topic is not allowed, "
+ "please try do it on specific topic partition");
}
});
}
}).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a non-persistent topic is not allowed");
}
ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger();
return ledger.asyncFindPosition(entry -> {
try {
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}] Error deserializing message for message position find", topicName, e);
} finally {
entry.release();
}
return false;
}).thenApply(position -> {
if (position == null) {
return null;
} else {
return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
topicName.getPartitionIndex());
}
});
});
}
protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
boolean authoritative) {
CompletableFuture<Void> ret;
// If the topic name is a partition name, no need to get partition topic metadata again
if (!topicName.isPartitioned()) {
ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenCompose(topicMetadata -> {
if (topicMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Peek messages on a partitioned topic is not allowed");
}
return CompletableFuture.completedFuture(null);
});
} else {
ret = CompletableFuture.completedFuture(null);
}
return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
CompletableFuture<Entry> entry;
if (!(topic instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(),
topicName, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Peek messages on a non-persistent topic is not allowed");
} else {
if (subName.startsWith(((PersistentTopic) topic).getReplicatorPrefix())) {
PersistentReplicator repl = getReplicatorReference(subName, (PersistentTopic) topic);
entry = repl.peekNthMessage(messagePosition);
} else {
PersistentSubscription sub =
(PersistentSubscription) getSubscriptionReference(subName, (PersistentTopic) topic);
entry = sub.peekNthMessage(messagePosition);
}
}
return entry;
}).thenCompose(entry -> {
try {
Response response = generateResponseWithEntry(entry);
return CompletableFuture.completedFuture(response);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Message not found");
} catch (Exception exception) {
log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(),
messagePosition, topicName, subName, exception);
throw new RestException(exception);
} finally {
if (entry != null) {
entry.release();
}
}
});
}
protected CompletableFuture<Response> internalExamineMessageAsync(String initialPosition, long messagePosition,
boolean authoritative) {
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
ret = CompletableFuture.completedFuture(null);
}
ret = ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative));
long messagePositionLocal = messagePosition < 1 ? 1 : messagePosition;
String initialPositionLocal = initialPosition == null ? "latest" : initialPosition;
if (!topicName.isPartitioned()) {
ret = ret.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenCompose(partitionedTopicMetadata -> {
if (partitionedTopicMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Examine messages on a partitioned topic is not allowed, "
+ "please try examine message on specific topic partition");
} else {
return CompletableFuture.completedFuture(null);
}
});
}
return ret.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Examine messages on a non-persistent topic is not allowed");
}
try {
PersistentTopic persistentTopic = (PersistentTopic) topic;
long totalMessage = persistentTopic.getNumberOfEntries();
PositionImpl startPosition = persistentTopic.getFirstPosition();
long messageToSkip = initialPositionLocal.equals("earliest") ? messagePositionLocal :
totalMessage - messagePositionLocal + 1;
CompletableFuture<Entry> future = new CompletableFuture<>();
PositionImpl readPosition = persistentTopic.getPositionAfterN(startPosition, messageToSkip);
persistentTopic.asyncReadEntry(readPosition, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
future.complete(entry);
}
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
return future;
} catch (ManagedLedgerException exception) {
log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(),
messagePosition,
topicName, exception);
throw new RestException(exception);
}
}).thenApply(entry -> {
try {
return generateResponseWithEntry(entry);
} catch (IOException exception) {
throw new RestException(exception);
} finally {
if (entry != null) {
entry.release();
}
}
});
}
private Response generateResponseWithEntry(Entry entry) throws IOException {
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();
long totalSize = metadataAndPayload.readableBytes();
BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
for (KeyValue keyValue : metadata.getPropertiesList()) {
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}
if (brokerEntryMetadata != null) {
if (brokerEntryMetadata.hasBrokerTimestamp()) {
responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp",
DateFormatter.format(brokerEntryMetadata.getBrokerTimestamp()));
}
if (brokerEntryMetadata.hasIndex()) {
responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-index", brokerEntryMetadata.getIndex());
}
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasDeliverAtTime()) {
responseBuilder.header("X-Pulsar-deliver-at-time", DateFormatter.format(metadata.getDeliverAtTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
responseBuilder.header("X-Pulsar-batch-size", totalSize
- metadata.getSerializedSize());
}
if (metadata.hasNullValue()) {
responseBuilder.header("X-Pulsar-null-value", metadata.isNullValue());
}
if (metadata.hasNumChunksFromMsg()) {
responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId()));
}
responseBuilder.header("X-Pulsar-Is-Encrypted", metadata.getEncryptionKeysCount() > 0);
if (metadata.hasProducerName()) {
responseBuilder.header("X-Pulsar-producer-name", metadata.getProducerName());
}
if (metadata.hasSequenceId()) {
responseBuilder.header("X-Pulsar-sequence-id", metadata.getSequenceId());
}
if (metadata.hasReplicatedFrom()) {
responseBuilder.header("X-Pulsar-replicated-from", metadata.getReplicatedFrom());
}
for (String replicatedTo : metadata.getReplicateTosList()) {
responseBuilder.header("X-Pulsar-replicated-to", replicatedTo);
}
if (metadata.hasPartitionKey()) {
responseBuilder.header("X-Pulsar-partition-key", metadata.getPartitionKey());
}
if (metadata.hasCompression()) {
responseBuilder.header("X-Pulsar-compression", metadata.getCompression());
}
if (metadata.hasUncompressedSize()) {
responseBuilder.header("X-Pulsar-uncompressed-size", metadata.getUncompressedSize());
}
if (metadata.hasEncryptionAlgo()) {
responseBuilder.header("X-Pulsar-encryption-algo", metadata.getEncryptionAlgo());
}
for (EncryptionKeys encryptionKeys : metadata.getEncryptionKeysList()) {
responseBuilder.header("X-Pulsar-Base64-encryption-keys",
Base64.getEncoder().encodeToString(encryptionKeys.toByteArray()));
}
if (metadata.hasEncryptionParam()) {
responseBuilder.header("X-Pulsar-Base64-encryption-param",
Base64.getEncoder().encodeToString(metadata.getEncryptionParam()));
}
if (metadata.hasSchemaVersion()) {
responseBuilder.header("X-Pulsar-Base64-schema-version",
Base64.getEncoder().encodeToString(metadata.getSchemaVersion()));
}
if (metadata.hasPartitionKeyB64Encoded()) {
responseBuilder.header("X-Pulsar-partition-key-b64-encoded", metadata.isPartitionKeyB64Encoded());
}
if (metadata.hasOrderingKey()) {
responseBuilder.header("X-Pulsar-Base64-ordering-key",
Base64.getEncoder().encodeToString(metadata.getOrderingKey()));
}
if (metadata.hasMarkerType()) {
responseBuilder.header("X-Pulsar-marker-type", metadata.getMarkerType());
}
if (metadata.hasTxnidLeastBits()) {
responseBuilder.header("X-Pulsar-txnid-least-bits", metadata.getTxnidLeastBits());
}
if (metadata.hasTxnidMostBits()) {
responseBuilder.header("X-Pulsar-txnid-most-bits", metadata.getTxnidMostBits());
}
if (metadata.hasHighestSequenceId()) {
responseBuilder.header("X-Pulsar-highest-sequence-id", metadata.getHighestSequenceId());
}
if (metadata.hasUuid()) {
responseBuilder.header("X-Pulsar-uuid", metadata.getUuid());
}
if (metadata.hasNumChunksFromMsg()) {
responseBuilder.header("X-Pulsar-num-chunks-from-msg", metadata.getNumChunksFromMsg());
}
if (metadata.hasTotalChunkMsgSize()) {
responseBuilder.header("X-Pulsar-total-chunk-msg-size", metadata.getTotalChunkMsgSize());
}
if (metadata.hasChunkId()) {
responseBuilder.header("X-Pulsar-chunk-id", metadata.getChunkId());
}
if (metadata.hasNullPartitionKey()) {
responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey());
}
// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());
// Copy into a heap buffer for output stream compatibility
ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
uncompressedPayload.readableBytes());
data.writeBytes(uncompressedPayload);
uncompressedPayload.release();
StreamingOutput stream = output -> {
output.write(data.array(), data.arrayOffset(), data.readableBytes());
data.release();
};
return responseBuilder.entity(stream).build();
}
protected CompletableFuture<PersistentOfflineTopicStats> internalGetBacklogAsync(boolean authoritative) {
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
ret = CompletableFuture.completedFuture(null);
}
// Validate that namespace exists, throw 404 if it doesn't exist
// note that we do not want to load the topic and hence skip authorization check
return ret.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
.thenCompose(__ -> {
PersistentOfflineTopicStats offlineTopicStats =
pulsar().getBrokerService().getOfflineTopicStat(topicName);
if (offlineTopicStats != null) {
// offline topic stat has a cost - so use cached value until TTL
long elapsedMs = System.currentTimeMillis() - offlineTopicStats.statGeneratedAt.getTime();
if (TimeUnit.MINUTES.convert(elapsedMs, TimeUnit.MILLISECONDS) < OFFLINE_TOPIC_STAT_TTL_MINS) {
return CompletableFuture.completedFuture(offlineTopicStats);
}
}
return pulsar().getBrokerService().getManagedLedgerConfig(topicName)
.thenCompose(config -> {
ManagedLedgerOfflineBacklog offlineTopicBacklog =
new ManagedLedgerOfflineBacklog(config.getDigestType(), config.getPassword(),
pulsar().getAdvertisedAddress(), false);
try {
PersistentOfflineTopicStats estimateOfflineTopicStats =
offlineTopicBacklog.estimateUnloadedTopicBacklog(
(ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(),
topicName);
pulsar().getBrokerService()
.cacheOfflineTopicStats(topicName, estimateOfflineTopicStats);
return CompletableFuture.completedFuture(estimateOfflineTopicStats);
} catch (Exception e) {
throw new RestException(e);
}
});
});
}
protected CompletableFuture<Map<BacklogQuota.BacklogQuotaType, BacklogQuota>> internalGetBacklogQuota(
boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> {
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaMap = op
.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> {
HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> hashMap = Maps.newHashMap();
map.forEach((key, value) -> hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key), value));
return hashMap;
}).orElse(Maps.newHashMap());
if (applied && quotaMap.isEmpty()) {
quotaMap = getNamespacePolicies(namespaceName).backlog_quota_map;
if (quotaMap.isEmpty()) {
for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
quotaMap.put(
backlogQuotaType,
namespaceBacklogQuota(namespaceName, backlogQuotaType)
);
}
}
}
return quotaMap;
});
}
protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse,
MessageIdImpl messageId, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenAccept(__ -> {
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(partitionMetadata -> {
if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
log.warn("[{}] Not supported calculate backlog size operation on partitioned-topic {}",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"calculate backlog size is not allowed for partitioned-topic"));
} else {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(unused -> validateTopicOperationAsync(topicName,
TopicOperation.GET_BACKLOG_SIZE))
.thenCompose(unused -> getTopicReferenceAsync(topicName))
.thenAccept(t -> {
PersistentTopic topic = (PersistentTopic) t;
PositionImpl pos = new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId());
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
ManagedLedgerImpl managedLedger =
(ManagedLedgerImpl) topic.getManagedLedger();
if (messageId.getLedgerId() == -1) {
asyncResponse.resume(managedLedger.getTotalSize());
} else {
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get backlog size for topic {}", clientAppId(),
topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get backlog size for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to validate global namespace ownership to get backlog size for topic "
+ "{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
BacklogQuotaImpl backlogQuota, boolean isGlobal) {
BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType == null
? BacklogQuota.BacklogQuotaType.destination_storage : backlogQuotaType;
return validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, PolicyOperation.WRITE)
.thenAccept(__ -> validatePoliciesReadOnlyAccess())
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
return getRetentionPoliciesAsync(topicName, topicPolicies)
.thenCompose(retentionPolicies -> {
if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
log.warn(
"[{}] Failed to update backlog configuration for topic {}: conflicts with"
+ " retention quota",
clientAppId(), topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for topic. "
+ "Please increase retention quota and retry"));
}
if (backlogQuota != null) {
topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), backlogQuota);
} else {
topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
}
Map<String, BacklogQuotaImpl> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, topicPolicies)
.thenRun(() -> {
try {
log.info(
"[{}] Successfully updated backlog quota map: namespace={}, "
+ "topic={}, map={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
jsonMapper().writeValueAsString(backLogQuotaMap));
} catch (JsonProcessingException ignore) {
}
});
});
});
}
protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds) {
return validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> {
Set<String> replicationClusters = Sets.newHashSet(clusterIds);
if (replicationClusters.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of replication clusters");
}
Set<String> clusters = clusters();
for (String clusterId : replicationClusters) {
if (!clusters.contains(clusterId)) {
throw new RestException(Status.FORBIDDEN, "Invalid cluster id: " + clusterId);
}
validatePeerClusterConflict(clusterId, replicationClusters);
validateClusterForTenant(namespaceName.getTenant(), clusterId);
}
}).thenCompose(__ ->
getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicationClusters(clusterIds);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.thenRun(() -> {
log.info("[{}] Successfully set replication clusters for namespace={}, "
+ "topic={}, clusters={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
topicPolicies.getReplicationClusters());
});
}
));
}
protected CompletableFuture<Void> internalRemoveReplicationClusters() {
return validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicationClusters(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.thenRun(() -> {
log.info("[{}] Successfully set replication clusters for namespace={}, "
+ "topic={}, clusters={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
topicPolicies.getReplicationClusters());
});
})
);
}
protected CompletableFuture<Boolean> internalGetDeduplication(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getDeduplicationEnabled)
.orElseGet(() -> {
if (applied) {
Boolean enabled = getNamespacePolicies(namespaceName).deduplicationEnabled;
return enabled == null ? config().isBrokerDeduplicationEnabled() : enabled;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setDeduplicationEnabled(enabled);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond, boolean isGlobal) {
//Validate message ttl value.
if (ttlInSecond != null && ttlInSecond < 0) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Invalid value for message TTL"));
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMessageTTLInSeconds(ttlInSecond);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.thenRun(() ->
log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
clientAppId(), namespaceName, topicName.getLocalName(), ttlInSecond));
});
}
private CompletableFuture<RetentionPolicies> getRetentionPoliciesAsync(TopicName topicName,
TopicPolicies topicPolicies) {
RetentionPolicies retentionPolicies = topicPolicies.getRetentionPolicies();
if (retentionPolicies != null) {
return CompletableFuture.completedFuture(retentionPolicies);
}
return getNamespacePoliciesAsync(topicName.getNamespaceObject())
.thenApply(policies -> policies.retention_policies);
}
protected CompletableFuture<RetentionPolicies> internalGetRetention(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getRetentionPolicies).orElseGet(() -> {
if (applied) {
RetentionPolicies policies = getNamespacePolicies(namespaceName).retention_policies;
return policies == null ? new RetentionPolicies(
config().getDefaultRetentionTimeInMinutes(), config().getDefaultRetentionSizeInMB())
: policies;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention, boolean isGlobal) {
if (retention == null) {
return CompletableFuture.completedFuture(null);
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
BacklogQuota backlogQuota = topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name());
if (backlogQuota == null) {
Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
backlogQuota = policies.backlog_quota_map.get(backlogQuotaType);
}
if (!checkBacklogQuota(backlogQuota, retention)) {
log.warn(
"[{}] Failed to update retention quota configuration for topic {}: "
+ "conflicts with retention quota",
clientAppId(), topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Retention Quota must exceed configured backlog quota for topic. "
+ "Please increase retention quota and retry"));
}
}
topicPolicies.setRetentionPolicies(retention);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalRemoveRetention(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setRetentionPolicies(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<PersistencePolicies> internalGetPersistence(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getPersistence)
.orElseGet(() -> {
if (applied) {
PersistencePolicies namespacePolicy = getNamespacePolicies(namespaceName)
.persistence;
return namespacePolicy == null
? new PersistencePolicies(
pulsar().getConfiguration().getManagedLedgerDefaultEnsembleSize(),
pulsar().getConfiguration().getManagedLedgerDefaultWriteQuorum(),
pulsar().getConfiguration().getManagedLedgerDefaultAckQuorum(),
pulsar().getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit())
: namespacePolicy;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies,
boolean isGlobal) {
validatePersistencePolicies(persistencePolicies);
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setPersistence(persistencePolicies);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalRemovePersistence(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setPersistence(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSize, boolean isGlobal) {
if (maxMessageSize != null && (maxMessageSize < 0 || maxMessageSize > config().getMaxMessageSize())) {
throw new RestException(Status.PRECONDITION_FAILED
, "topic-level maxMessageSize must be greater than or equal to 0 "
+ "and must be smaller than that in the broker-level");
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxMessageSize(maxMessageSize);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Optional<Integer>> internalGetMaxMessageSize(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxMessageSize));
}
protected CompletableFuture<Integer> internalGetMaxProducers(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxProducerPerTopic)
.orElseGet(() -> {
if (applied) {
Integer maxProducer = getNamespacePolicies(namespaceName).max_producers_per_topic;
return maxProducer == null ? config().getMaxProducersPerTopic() : maxProducer;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducers, boolean isGlobal) {
if (maxProducers != null && maxProducers < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxProducers must be 0 or more");
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxProducerPerTopic(maxProducers);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxSubscriptionsPerTopic));
}
protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic,
boolean isGlobal) {
if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxSubscriptionsPerTopic must be 0 or more");
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getReplicatorDispatchRate)
.orElseGet(() -> {
if (applied) {
DispatchRateImpl namespacePolicy = getNamespacePolicies(namespaceName)
.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
return namespacePolicy == null ? replicatorDispatchRate() : namespacePolicy;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicatorDispatchRate(dispatchRate);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> preValidation(boolean authoritative) {
if (!config().isTopicLevelPoliciesEnabled()) {
return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Topic level policies is disabled, to enable the topic level policy and retry."));
}
if (topicName.isPartitioned()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Not allowed to set/get topic policy for a partition"));
}
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
ret = CompletableFuture.completedFuture(null);
}
return ret
.thenCompose(__ -> checkTopicExistsAsync(topicName))
.thenCompose(exist -> {
if (!exist) {
throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
} else {
return getPartitionedTopicMetadataAsync(topicName, false, false)
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return validateTopicOwnershipAsync(TopicName.get(topicName.toString()
+ TopicName.PARTITIONED_TOPIC_SUFFIX + 0), authoritative);
} else {
return validateTopicOwnershipAsync(topicName, authoritative);
}
});
}
});
}
protected CompletableFuture<Void> internalRemoveMaxProducers(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setMaxProducerPerTopic(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<Integer> internalGetMaxConsumers(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxConsumerPerTopic)
.orElseGet(() -> {
if (applied) {
Integer maxConsumer = getNamespacePolicies(namespaceName).max_consumers_per_topic;
return maxConsumer == null ? config().getMaxConsumersPerTopic() : maxConsumer;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetMaxConsumers(Integer maxConsumers, boolean isGlobal) {
if (maxConsumers != null && maxConsumers < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxConsumers must be 0 or more");
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxConsumerPerTopic(maxConsumers);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalRemoveMaxConsumers(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setMaxConsumerPerTopic(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<MessageId> internalTerminateAsync(boolean authoritative) {
if (SystemTopicNames.isSystemTopic(topicName)) {
return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Termination of a system topic is not allowed"));
}
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
ret = CompletableFuture.completedFuture(null);
}
return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.TERMINATE))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Termination of a partitioned topic is not allowed");
}
})
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic)) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Termination of a non-persistent topic is not allowed");
}
return ((PersistentTopic) topic).terminate();
});
}
protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.TERMINATE)
.thenCompose(unused -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
String msg = "Termination of a non-partitioned topic is not allowed using partitioned-terminate"
+ ", please use terminate commands";
log.error("[{}] [{}] {}", clientAppId(), topicName, msg);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, msg));
return;
}
if (partitionMetadata.partitions > 0) {
Map<Integer, MessageId> messageIds = new ConcurrentHashMap<>(partitionMetadata.partitions);
final List<CompletableFuture<MessageId>> futures =
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
int finalI = i;
futures.add(pulsar().getAdminClient().topics()
.terminateTopicAsync(topicNamePartition.toString())
.whenComplete((messageId, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to terminate topic {}", clientAppId(),
topicNamePartition, throwable);
asyncResponse.resume(new RestException(throwable));
}
messageIds.put(finalI, messageId);
}));
} catch (Exception e) {
log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition,
e);
throw new RestException(e);
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
} else {
log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, t);
asyncResponse.resume(new RestException(t));
}
}
asyncResponse.resume(messageIds);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, String subName,
int expireTimeInSeconds, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ ->
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(unused2 ->
// If the topic name is a partition name, no need to get partition topic metadata again
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenCompose(partitionMetadata -> {
if (topicName.isPartitioned()) {
return internalExpireMessagesByTimestampForSinglePartitionAsync
(partitionMetadata, subName, expireTimeInSeconds)
.thenAccept(unused3 ->
asyncResponse.resume(Response.noContent().build()));
} else {
if (partitionMetadata.partitions > 0) {
return CompletableFuture.completedFuture(null).thenAccept(unused -> {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
// expire messages for each partition topic
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar()
.getAdminClient()
.topics()
.expireMessagesAsync(topicNamePartition.toString(),
subName, expireTimeInSeconds));
} catch (Exception e) {
log.error("[{}] Failed to expire messages up to {} on {}",
clientAppId(),
expireTimeInSeconds, topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(),
subName)));
return null;
} else {
log.error("[{}] Failed to expire messages up "
+ "to {} on {}", clientAppId(),
expireTimeInSeconds, topicName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
});
} else {
return internalExpireMessagesByTimestampForSinglePartitionAsync
(partitionMetadata, subName, expireTimeInSeconds)
.thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()));
}
}
}))
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(),
expireTimeInSeconds, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartitionAsync(
PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds) {
if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
String msg = "This method should not be called for partitioned topic";
return FutureUtil.failedFuture(new IllegalStateException(msg));
} else {
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
getTopicReferenceAsync(topicName).thenAccept(t -> {
if (t == null) {
resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
if (!(t instanceof PersistentTopic)) {
resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED,
"Expire messages on a non-persistent topic is not allowed"));
return;
}
PersistentTopic topic = (PersistentTopic) t;
boolean issued;
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic
.getPersistentReplicator(remoteCluster);
if (repl == null) {
resultFuture.completeExceptionally(
new RestException(Status.NOT_FOUND, "Replicator not found"));
return;
}
issued = repl.expireMessages(expireTimeInSeconds);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
resultFuture.completeExceptionally(
new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
issued = sub.expireMessages(expireTimeInSeconds);
}
if (issued) {
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(),
expireTimeInSeconds, topicName, subName);
resultFuture.complete(null);
} else {
if (log.isDebugEnabled()) {
log.debug("Expire message by timestamp not issued on topic {} for subscription {} "
+ "due to ongoing message expiration not finished or subscription almost"
+ " catch up. If it's performed on a partitioned topic operation might "
+ "succeeded on other partitions, please check stats of individual "
+ "partition.", topicName, subName);
}
resultFuture.completeExceptionally(new RestException(Status.CONFLICT, "Expire message "
+ "by timestamp not issued on topic " + topicName + " for subscription "
+ subName + " due to ongoing message expiration not finished or subscription "
+ "almost catch up. If it's performed on a partitioned topic operation might"
+ " succeeded on other partitions, please check stats of individual partition."
));
return;
}
}).exceptionally(e -> {
resultFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
return resultFuture;
}
}
protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(__ -> {
log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(partitionMetadata -> {
if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
String msg = "Expire message at position is not supported for partitioned-topic";
log.warn("[{}] {} {}({}) {}", clientAppId(), msg, topicName, messageId, subName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, msg));
return;
} else if (messageId.getPartitionIndex() != topicName.getPartitionIndex()) {
String msg = "Invalid parameter for expire message by position, partition index of "
+ "passed in message position doesn't match partition index for the topic";
log.warn("[{}] {} {}({}).", clientAppId(), msg, topicName, messageId);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, msg));
return;
} else {
internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName,
messageId, isExcluded, batchIndex);
}
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}",
clientAppId(), topicName, subName, messageId, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private CompletableFuture<Void> internalExpireMessagesNonPartitionedTopicByPosition(AsyncResponse asyncResponse,
String subName,
MessageIdImpl messageId,
boolean isExcluded,
int batchIndex) {
return getTopicReferenceAsync(topicName).thenAccept(t -> {
PersistentTopic topic = (PersistentTopic) t;
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);
batchSizeFuture.thenAccept(bi -> {
PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
boolean issued;
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Replicator not found"));
return;
}
issued = repl.expireMessages(position);
} else {
issued = sub.expireMessages(position);
}
if (issued) {
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position,
topicName, subName);
} else {
if (log.isDebugEnabled()) {
log.debug("Expire message by position not issued on topic {} for subscription {} "
+ "due to ongoing message expiration not finished or subscription almost "
+ "catch up.", topicName, subName);
}
throw new RestException(Status.CONFLICT, "Expire message by position not issued on topic "
+ topicName + " for subscription " + subName + " due to ongoing"
+ " message expiration not finished or invalid message position provided.");
}
} catch (Exception exception) {
log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}",
clientAppId(), position, topicName, subName, exception);
throw new RestException(exception);
}
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}",
clientAppId(), messageId, topicName, subName, e);
asyncResponse.resume(e);
return null;
});
} catch (Exception e) {
log.warn("[{}][{}] Failed to expire messages up to {} on subscription {} to position {}",
clientAppId(), topicName, messageId, subName, messageId, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}", clientAppId(),
topicName, subName, messageId, cause);
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}
protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) {
log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName);
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalTriggerCompactionNonPartitionedTopic(asyncResponse, authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayListWithCapacity(numPartitions);
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar()
.getAdminClient()
.topics()
.triggerCompactionAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}",
clientAppId(), topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable th = exception.getCause();
if (th instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
return null;
} else if (th instanceof WebApplicationException) {
asyncResponse.resume(th);
return null;
} else {
log.error("[{}] Failed to trigger compaction on topic {}",
clientAppId(), topicName, exception);
asyncResponse.resume(new RestException(exception));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalTriggerCompactionNonPartitionedTopic(asyncResponse, authoritative);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to validate global namespace ownership to trigger compaction on topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalTriggerCompactionNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.COMPACT))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
try {
((PersistentTopic) topic).triggerCompaction();
asyncResponse.resume(Response.noContent().build());
} catch (AlreadyRunningException e) {
resumeAsyncResponseExceptionally(asyncResponse,
new RestException(Status.CONFLICT, e.getMessage()));
return;
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(),
topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, new RestException(e));
return;
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to trigger compaction for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
}
);
}
protected CompletableFuture<LongRunningProcessStatus> internalCompactionStatusAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.COMPACT))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenApply(topic -> ((PersistentTopic) topic).compactionStatus());
}
protected void internalTriggerOffload(AsyncResponse asyncResponse,
boolean authoritative, MessageIdImpl messageId) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
try {
((PersistentTopic) topic).triggerOffload(messageId);
asyncResponse.resume(Response.noContent().build());
} catch (AlreadyRunningException e) {
resumeAsyncResponseExceptionally(asyncResponse,
new RestException(Status.CONFLICT, e.getMessage()));
return;
} catch (Exception e) {
log.warn("Unexpected error triggering offload", e);
resumeAsyncResponseExceptionally(asyncResponse, new RestException(e));
return;
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to trigger offload for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
OffloadProcessStatus offloadProcessStatus = ((PersistentTopic) topic).offloadStatus();
asyncResponse.resume(offloadProcessStatus);
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to offload status on topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
.thenRun(() -> authorizationFuture.complete(null))
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
validateAdminAccessForTenantAsync(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
.thenRun(() -> {
authorizationFuture.complete(null);
}).exceptionally(ex -> {
Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
if (throwable2 instanceof RestException) {
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
authorizationFuture.completeExceptionally(new PulsarClientException(
String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName, throwable2.getMessage())));
} else {
authorizationFuture.completeExceptionally(throwable2);
}
return null;
});
} else {
// throw without wrapping to PulsarClientException that considers: unknown error marked as
// internal server error
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
authorizationFuture.completeExceptionally(throwable);
}
return null;
});
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
authorizationFuture.thenCompose(__ ->
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
.thenCompose(res ->
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
})
.exceptionally(e -> {
metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
return metadataFuture;
}
/**
* Get partitioned topic metadata without checking the permission.
*/
public static CompletableFuture<PartitionedTopicMetadata> unsafeGetPartitionedTopicMetadataAsync(
PulsarService pulsar, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture();
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
.thenCompose(res -> pulsar.getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("Total number of partitions for topic {} is {}", topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex.getCause());
return null;
});
return metadataFuture;
}
private CompletableFuture<Topic> getTopicReferenceAsync(TopicName topicName) {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenCompose(optTopic -> optTopic
.map(CompletableFuture::completedFuture)
.orElseGet(() -> topicNotFoundReasonAsync(topicName)));
}
private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) {
if (!topicName.isPartitioned()) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
}
return getPartitionedTopicMetadataAsync(
TopicName.get(topicName.getPartitionedTopicName()), false, false)
.thenAccept(partitionedTopicMetadata -> {
if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
final String topicErrorType = partitionedTopicMetadata
== null ? "has no metadata" : "has zero partitions";
throw new RestException(Status.NOT_FOUND, String.format(
"Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
}
})
.thenCompose(__ -> internalGetListAsync(Optional.empty()))
.thenApply(topics -> {
if (!topics.contains(topicName.toString())) {
throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
}
throw new RestException(Status.NOT_FOUND,
getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
});
}
/**
* Get the Subscription object reference from the Topic reference.
*/
private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
try {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
sub = topic.createSubscription(subName,
InitialPosition.Earliest, false, null).get();
}
return checkNotNull(sub);
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, getSubNotFoundErrorMessage(topicName.toString(), subName));
}
}
/**
* Get the Replicator object reference from the Topic reference.
*/
private PersistentReplicator getReplicatorReference(String replName, PersistentTopic topic) {
try {
String remoteCluster = PersistentReplicator.getRemoteCluster(replName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
return checkNotNull(repl);
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, "Replicator not found");
}
}
private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions, boolean force) {
CompletableFuture<Void> result = new CompletableFuture<>();
createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(topicName, p ->
new PartitionedTopicMetadata(numPartitions, p.properties));
future.exceptionally(ex -> {
// If the update operation fails, clean up the partitions that were created
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
int oldPartition = metadata.partitions;
for (int i = oldPartition; i < numPartitions; i++) {
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> {
log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName,
ex1.getCause());
return null;
});
}
}).exceptionally(e -> {
log.warn("[{}] Failed to clean up managedLedger", topicName, e);
return null;
});
return null;
});
return future;
}).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
if (force && ex.getCause() instanceof PulsarAdminException.ConflictException) {
result.complete(null);
return null;
}
result.completeExceptionally(ex);
return null;
});
return result;
}
/**
* It creates subscriptions for new partitions of existing partitioned-topics.
*
* @param topicName : topic-name: persistent://prop/cluster/ns/topic
* @param numPartitions : number partitions for the topics
*/
private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
CompletableFuture<Void> result = new CompletableFuture<>();
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions < 1) {
result.completeExceptionally(new RestException(Status.CONFLICT, "Topic is not partitioned topic"));
return;
}
if (partitionMetadata.partitions >= numPartitions) {
result.completeExceptionally(new RestException(Status.CONFLICT,
"number of partitions must be more than existing " + partitionMetadata.partitions));
return;
}
PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException e1) {
result.completeExceptionally(e1);
return;
}
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> {
List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
stats.getSubscriptions().entrySet().forEach(e -> {
String subscription = e.getKey();
SubscriptionStats ss = e.getValue();
if (!ss.isDurable()) {
// We must not re-create non-durable subscriptions on the new partitions
return;
}
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest));
}
});
FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
result.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to create subscriptions on new partitions for {}",
clientAppId(), topicName, ex);
result.completeExceptionally(ex);
return null;
});
}).exceptionally(ex -> {
if (ex.getCause() instanceof PulsarAdminException.NotFoundException) {
// The first partition doesn't exist, so there are currently to subscriptions to recreate
result.complete(null);
} else {
log.warn("[{}] Failed to get list of subscriptions of {}",
clientAppId(), topicName.getPartition(0), ex);
result.completeExceptionally(ex);
}
return null;
});
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partition metadata for {}",
clientAppId(), topicName.toString());
result.completeExceptionally(ex);
return null;
});
return result;
}
// as described at : (PR: #836) CPP-client old client lib should not be allowed to connect on partitioned-topic.
// So, all requests from old-cpp-client (< v1.21) must be rejected.
// Pulsar client-java lib always passes user-agent as X-Java-$version.
// However, cpp-client older than v1.20 (PR #765) never used to pass it.
// So, request without user-agent and Pulsar-CPP-vX (X < 1.21) must be rejected
protected CompletableFuture<Void> internalValidateClientVersionAsync() {
if (!pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) {
return CompletableFuture.completedFuture(null);
}
final String userAgent = httpRequest.getHeader("User-Agent");
if (StringUtils.isBlank(userAgent)) {
return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Client lib is not compatible to"
+ " access partitioned metadata: version in user-agent is not present"));
}
// Version < 1.20 for cpp-client is not allowed
if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
try {
// Version < 1.20 for cpp-client is not allowed
String[] tokens = userAgent.split(DEPRECATED_CLIENT_VERSION_PREFIX);
String[] splits = tokens.length > 1 ? tokens[1].split("-")[0].trim().split("\\.") : null;
if (splits != null && splits.length > 1) {
if (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > Integer.parseInt(splits[0])
|| LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() > Integer.parseInt(splits[1])) {
return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Client lib is not compatible to access partitioned metadata: version " + userAgent
+ " is not supported"));
}
}
} catch (Exception e) {
log.warn("[{}] Failed to parse version {} ", clientAppId(), userAgent);
}
}
return CompletableFuture.completedFuture(null);
}
/**
* Validate update of number of partition for partitioned topic.
* If there's already non partition topic with same name and contains partition suffix "-partition-"
* followed by numeric value X then the new number of partition of that partitioned topic can not be greater
* than that X else that non partition topic will essentially be overwritten and cause unexpected consequence.
*
* @param topicName
*/
private CompletableFuture<Void> validatePartitionTopicUpdateAsync(String topicName, int numberOfPartition) {
return internalGetListAsync().thenCompose(existingTopicList -> {
TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName);
String prefix = partitionTopicName.getPartitionedTopicName() + TopicName.PARTITIONED_TOPIC_SUFFIX;
return getPartitionedTopicMetadataAsync(partitionTopicName, false, false)
.thenAccept(metadata -> {
int oldPartition = metadata.partitions;
for (String existingTopicName : existingTopicList) {
if (existingTopicName.startsWith(prefix)) {
try {
long suffix = Long.parseLong(existingTopicName.substring(
existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
// Skip partition of partitioned topic by making sure
// the numeric suffix greater than old partition number.
if (suffix >= oldPartition && suffix <= (long) numberOfPartition) {
log.warn(
"[{}] Already have non partition topic {} which contains partition"
+ " suffix '-partition-' and end with numeric value smaller"
+ " than the new number of partition. Update of partitioned"
+ " topic {} could cause conflict.",
clientAppId(),
existingTopicName, topicName);
throw new RestException(Status.PRECONDITION_FAILED,
"Already have non partition topic " + existingTopicName
+ " which contains partition suffix '-partition-' "
+ "and end with numeric value and end with numeric value"
+ " smaller than the new number of partition. Update of"
+ " partitioned topic " + topicName + " could cause conflict.");
}
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict with internal created partitioned topic's name.
}
}
}
});
});
}
/**
* Validate non partition topic name,
* Validation will fail and throw RestException if
* 1) Topic name contains partition suffix "-partition-" and the remaining part follow the partition
* suffix is numeric value larger than the number of partition if there's already a partition topic with same
* name(the part before suffix "-partition-").
* 2)Topic name contains partition suffix "-partition-" and the remaining part follow the partition
* suffix is numeric value but there isn't a partitioned topic with same name.
*
* @param topicName
*/
private CompletableFuture<Void> validateNonPartitionTopicNameAsync(String topicName) {
CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);
if (topicName.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
try {
// First check if what's after suffix "-partition-" is number or not, if not number then can create.
int partitionIndex = topicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
long suffix = Long.parseLong(topicName.substring(partitionIndex
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
TopicName partitionTopicName = TopicName.get(domain(),
namespaceName, topicName.substring(0, partitionIndex));
ret = getPartitionedTopicMetadataAsync(partitionTopicName, false, false)
.thenAccept(metadata -> {
// Partition topic index is 0 to (number of partition - 1)
if (metadata.partitions > 0 && suffix >= (long) metadata.partitions) {
log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
+ " a number smaller then number of partition of partitioned topic {}.",
clientAppId(), topicName, partitionTopicName.getLocalName());
throw new RestException(Status.PRECONDITION_FAILED,
"Can't create topic " + topicName + " with \"-partition-\" followed by"
+ " a number smaller then number of partition of partitioned topic "
+ partitionTopicName.getLocalName());
} else if (metadata.partitions == 0) {
log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
+ " numeric value if there isn't a partitioned topic {} created.",
clientAppId(), topicName, partitionTopicName.getLocalName());
throw new RestException(Status.PRECONDITION_FAILED,
"Can't create topic " + topicName + " with \"-partition-\" followed by"
+ " numeric value if there isn't a partitioned topic "
+ partitionTopicName.getLocalName() + " created.");
}
// If there is a partitioned topic with the same name and numeric suffix is smaller
// than the number of partition for that partitioned topic, validation will pass.
});
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict if user want to create partitioned topic with same
// topic name prefix in the future.
}
}
return ret;
}
protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
if (!(topic instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"GetLastMessageId on a non-persistent topic is not allowed"));
return;
}
topic.getLastMessageId().whenComplete((v, e) -> {
if (e != null) {
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()));
} else {
asyncResponse.resume(v);
}
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get last messageId {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected CompletableFuture<DispatchRateImpl> internalGetDispatchRate(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getDispatchRate)
.orElseGet(() -> {
if (applied) {
DispatchRateImpl namespacePolicy = getNamespacePolicies(namespaceName)
.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
return namespacePolicy == null ? dispatchRate() : namespacePolicy;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetDispatchRate(DispatchRateImpl dispatchRate, boolean isGlobal) {
if (dispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setDispatchRate(dispatchRate);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalRemoveDispatchRate(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
TopicPolicies topicPolicies = op.get();
topicPolicies.setDispatchRate(null);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRate(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getSubscriptionDispatchRate)
.orElseGet(() -> {
if (applied) {
DispatchRateImpl namespacePolicy = getNamespacePolicies(namespaceName)
.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
return namespacePolicy == null ? subscriptionDispatchRate() : namespacePolicy;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetSubscriptionDispatchRate
(DispatchRateImpl dispatchRate, boolean isGlobal) {
if (dispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setSubscriptionDispatchRate(dispatchRate);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
TopicPolicies topicPolicies = op.get();
topicPolicies.setSubscriptionDispatchRate(null);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<DispatchRate> internalGetSubscriptionLevelDispatchRate(String subName, boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(otp -> {
DispatchRateImpl rate = otp.map(tp -> tp.getSubscriptionPolicies().get(subName))
.map(SubscriptionPolicies::getDispatchRate)
.orElse(null);
if (applied && rate == null) {
return internalGetSubscriptionDispatchRate(true, isGlobal);
} else {
return CompletableFuture.completedFuture(rate);
}
});
}
protected CompletableFuture<Void> internalSetSubscriptionLevelDispatchRate(String subName,
DispatchRateImpl dispatchRate,
boolean isGlobal) {
final DispatchRateImpl newDispatchRate = DispatchRateImpl.normalize(dispatchRate);
if (newDispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setIsGlobal(isGlobal);
topicPolicies.getSubscriptionPolicies()
.computeIfAbsent(subName, k -> new SubscriptionPolicies())
.setDispatchRate(newDispatchRate);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalRemoveSubscriptionLevelDispatchRate(String subName, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
TopicPolicies topicPolicies = op.get();
SubscriptionPolicies sp = topicPolicies.getSubscriptionPolicies().get(subName);
if (sp == null) {
return CompletableFuture.completedFuture(null);
}
sp.setDispatchRate(null);
if (sp.checkEmpty()) {
// cleanup empty SubscriptionPolicies
topicPolicies.getSubscriptionPolicies().remove(subName, sp);
}
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<Optional<Integer>> internalGetMaxConsumersPerSubscription(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxConsumersPerSubscription));
}
protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(
Integer maxConsumersPerSubscription, boolean isGlobal) {
if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription");
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setMaxConsumersPerSubscription(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<Long> internalGetCompactionThreshold(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getCompactionThreshold)
.orElseGet(() -> {
if (applied) {
Long namespacePolicy = getNamespacePolicies(namespaceName).compaction_threshold;
return namespacePolicy == null
? pulsar().getConfiguration().getBrokerServiceCompactionThresholdInBytes()
: namespacePolicy;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold, boolean isGlobal) {
if (compactionThreshold != null && compactionThreshold < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setCompactionThreshold(compactionThreshold);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalRemoveCompactionThreshold(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
TopicPolicies topicPolicies = op.get();
topicPolicies.setCompactionThreshold(null);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<Optional<PublishRate>> internalGetPublishRate(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getPublishRate));
}
protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate, boolean isGlobal) {
if (publishRate == null) {
return CompletableFuture.completedFuture(null);
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setPublishRate(publishRate);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Optional<List<SubType>>> internalGetSubscriptionTypesEnabled(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getSubscriptionTypesEnabled));
}
protected CompletableFuture<Void> internalSetSubscriptionTypesEnabled(
Set<SubscriptionType> subscriptionTypesEnabled, boolean isGlobal) {
List<SubType> subTypes = Lists.newArrayList();
subscriptionTypesEnabled.forEach(subscriptionType -> subTypes.add(SubType.valueOf(subscriptionType.name())));
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setSubscriptionTypesEnabled(subTypes);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalRemoveSubscriptionTypesEnabled(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setSubscriptionTypesEnabled(Lists.newArrayList());
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<Void> internalRemovePublishRate(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setPublishRate(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected CompletableFuture<SubscribeRate> internalGetSubscribeRate(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getSubscribeRate)
.orElseGet(() -> {
if (applied) {
SubscribeRate namespacePolicy = getNamespacePolicies(namespaceName)
.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
return namespacePolicy == null ? subscribeRate() : namespacePolicy;
}
return null;
}));
}
protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate, boolean isGlobal) {
if (subscribeRate == null) {
return CompletableFuture.completedFuture(null);
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setSubscribeRate(subscribeRate);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
protected CompletableFuture<Void> internalRemoveSubscribeRate(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setSubscribeRate(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
Throwable cause = thr.getCause();
if (!(cause instanceof WebApplicationException) || !(
((WebApplicationException) cause).getResponse().getStatus() == 307
|| ((WebApplicationException) cause).getResponse().getStatus() == 404)) {
log.error("[{}] Failed to perform {} on topic {}",
clientAppId(), methodName, topicName, cause);
}
resumeAsyncResponseExceptionally(asyncResponse, cause);
}
protected CompletableFuture<Void> internalTruncateNonPartitionedTopicAsync(boolean authoritative) {
return validateAdminAccessForTenantAsync(topicName.getTenant())
.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(Topic::truncate);
}
protected CompletableFuture<Void> internalTruncateTopicAsync(boolean authoritative) {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
return internalTruncateNonPartitionedTopicAsync(authoritative);
} else {
return getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenCompose(meta -> {
if (meta.partitions > 0) {
final List<CompletableFuture<Void>> futures = new ArrayList<>(meta.partitions);
for (int i = 0; i < meta.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(
pulsar().getAdminClient().topics()
.truncateAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to truncate topic {}", clientAppId(), topicNamePartition, e);
return FutureUtil.failedFuture(new RestException(e));
}
}
return FutureUtil.waitForAll(futures);
} else {
return internalTruncateNonPartitionedTopicAsync(authoritative);
}
});
}
}
protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
boolean authoritative, boolean enabled) {
log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
topicName, subName);
// Reject the request if the topic is not persistent
if (!topicName.isPersistent()) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-persistent topics"));
return;
}
// Reject the request if the topic is not global
if (!topicName.isGlobal()) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-global topics"));
return;
}
// 1.Permission to consume this topic is required
// 2.Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
CompletableFuture<Void> validateFuture =
validateTopicOperationAsync(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName)
.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName));
CompletableFuture<Void> resultFuture;
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
resultFuture = validateFuture.thenAccept(
__ -> internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName,
authoritative, enabled));
} else {
resultFuture = validateFuture.
thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
topicNamePartition.toString(), subName, enabled));
} catch (Exception e) {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
clientAppId(), enabled, topicNamePartition, subName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse
.resume(new RestException(Status.NOT_FOUND,
"Topic or subscription not found"));
return null;
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Cannot enable/disable replicated subscriptions on non-global topics"));
return null;
} else {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
clientAppId(), enabled, topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName,
authoritative, enabled);
}
});
}
resultFuture.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
topicName, subName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(
AsyncResponse asyncResponse, String subName, boolean authoritative, boolean enabled) {
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
if (!((PersistentSubscription) sub).setReplicated(enabled)) {
asyncResponse.resume(
new RestException(Status.INTERNAL_SERVER_ERROR,
"Failed to update cursor properties"));
return;
}
((PersistentTopic) topic).checkReplicatedSubscriptionControllerState();
log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(),
enabled, topicName, subName);
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-persistent topics"));
}
}
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to set replicated subscription status on {} {}", clientAppId(),
topicName, subName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
log.info("[{}] Attempting to get replicated subscription status on {} {}", clientAppId(), topicName, subName);
// Reject the request if the topic is not persistent
if (!topicName.isPersistent()) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-persistent topics"));
return;
}
// Reject the request if the topic is not global
if (!topicName.isGlobal()) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-global topics"));
return;
}
// Permission to consume this topic is required
CompletableFuture<Void> validateFuture =
validateTopicOperationAsync(topicName, TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
CompletableFuture<Void> resultFuture;
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
resultFuture = validateFuture.thenAccept(
__ -> internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
subName, authoritative));
} else {
resultFuture = validateFuture
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Map<String, Boolean>>> futures =
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
final Map<String, Boolean> status = Maps.newHashMap();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName partition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
partition.toString(), subName).whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get replicated subscriptions on {} {}",
clientAppId(), partition, subName, throwable);
asyncResponse.resume(new RestException(throwable));
}
status.putAll(response);
}));
} catch (Exception e) {
log.warn("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), partition, subName, e);
throw new RestException(e);
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic or subscription not found"));
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Cannot get replicated subscriptions on non-global topics"));
} else {
log.error("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
}
}
asyncResponse.resume(status);
return null;
});
} else {
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName,
authoritative);
}
});
}
resultFuture.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(
AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
Map res = Maps.newHashMap();
res.put(topicName.toString(), sub.isReplicated());
asyncResponse.resume(res);
} else {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-persistent topics"));
}
})
.exceptionally(e -> {
Throwable cause = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, cause);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}
protected CompletableFuture<SchemaCompatibilityStrategy> internalGetSchemaCompatibilityStrategy(boolean applied) {
if (applied) {
return getSchemaCompatibilityStrategyAsync();
}
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose(n -> getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
if (!op.isPresent()) {
return null;
}
SchemaCompatibilityStrategy strategy = op.get().getSchemaCompatibilityStrategy();
return SchemaCompatibilityStrategy.isUndefined(strategy) ? null : strategy;
}));
}
protected CompletableFuture<Void> internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.WRITE)
.thenCompose((__) -> getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setSchemaCompatibilityStrategy(
strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy);
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, topicPolicies);
}));
}
protected CompletableFuture<Boolean> internalGetSchemaValidationEnforced(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getSchemaValidationEnforced).orElseGet(() -> {
if (applied) {
boolean namespacePolicy = getNamespacePolicies(namespaceName).schema_validation_enforced;
return namespacePolicy || pulsar().getConfiguration().isSchemaValidationEnforced();
}
return false;
}));
}
protected CompletableFuture<Void> internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setSchemaValidationEnforced(schemaValidationEnforced);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
}