| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.pulsar.broker.admin.impl; |
| |
| import static org.apache.commons.lang3.StringUtils.isBlank; |
| import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; |
| import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; |
| import static org.apache.pulsar.common.policies.data.Policies.getBundles; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import com.google.common.collect.Sets.SetView; |
| import java.lang.reflect.Field; |
| import java.net.URI; |
| import java.net.URL; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import javax.ws.rs.WebApplicationException; |
| import javax.ws.rs.container.AsyncResponse; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.Response.Status; |
| import javax.ws.rs.core.UriBuilder; |
| import org.apache.commons.lang.mutable.MutableObject; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.pulsar.broker.PulsarServerException; |
| import org.apache.pulsar.broker.admin.AdminResource; |
| import org.apache.pulsar.broker.authorization.AuthorizationService; |
| import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; |
| import org.apache.pulsar.broker.service.Subscription; |
| import org.apache.pulsar.broker.service.Topic; |
| import org.apache.pulsar.broker.service.persistent.PersistentReplicator; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.broker.systopic.SystemTopicClient; |
| import org.apache.pulsar.broker.web.RestException; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.api.SubscriptionType; |
| import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; |
| import org.apache.pulsar.common.naming.NamedEntity; |
| import org.apache.pulsar.common.naming.NamespaceBundle; |
| import org.apache.pulsar.common.naming.NamespaceBundleFactory; |
| import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; |
| import org.apache.pulsar.common.naming.NamespaceBundles; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicDomain; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.AuthAction; |
| import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; |
| import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; |
| import org.apache.pulsar.common.policies.data.BacklogQuota; |
| import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; |
| import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; |
| import org.apache.pulsar.common.policies.data.BundlesData; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; |
| import org.apache.pulsar.common.policies.data.DispatchRate; |
| import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; |
| import org.apache.pulsar.common.policies.data.LocalPolicies; |
| import org.apache.pulsar.common.policies.data.NamespaceOperation; |
| import org.apache.pulsar.common.policies.data.OffloadPolicies; |
| import org.apache.pulsar.common.policies.data.PersistencePolicies; |
| import org.apache.pulsar.common.policies.data.Policies; |
| import org.apache.pulsar.common.policies.data.PolicyName; |
| import org.apache.pulsar.common.policies.data.PolicyOperation; |
| import org.apache.pulsar.common.policies.data.PublishRate; |
| import org.apache.pulsar.common.policies.data.RetentionPolicies; |
| import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; |
| import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; |
| import org.apache.pulsar.common.policies.data.SubscribeRate; |
| import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; |
| import org.apache.pulsar.common.policies.data.TenantOperation; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; |
| import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; |
| import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public abstract class NamespacesBase extends AdminResource { |
| |
| protected List<String> internalGetTenantNamespaces(String tenant) { |
| checkNotNull(tenant, "Tenant should not be null"); |
| try { |
| NamedEntity.checkName(tenant); |
| } catch (IllegalArgumentException e) { |
| log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e); |
| throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"); |
| } |
| validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES); |
| |
| try { |
| return getListOfNamespaces(tenant); |
| } catch (KeeperException.NoNodeException e) { |
| log.warn("[{}] Failed to get namespace list for tenant: {} - Does not exist", clientAppId(), tenant); |
| throw new RestException(Status.NOT_FOUND, "Property does not exist"); |
| } catch (Exception e) { |
| log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalCreateNamespace(Policies policies) { |
| validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE); |
| validatePoliciesReadOnlyAccess(); |
| validatePolicies(namespaceName, policies); |
| |
| try { |
| int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant(); |
| // no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded. |
| if (maxNamespacesPerTenant > 0) { |
| List<String> namespaces = getListOfNamespaces(namespaceName.getTenant()); |
| if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant()); |
| } |
| } |
| namespaceResources().create(path(POLICIES, namespaceName.toString()), policies); |
| log.info("[{}] Created namespace {}", clientAppId(), namespaceName); |
| } catch (AlreadyExistsException e) { |
| log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName); |
| throw new RestException(Status.CONFLICT, "Namespace already exists"); |
| } catch (Exception e) { |
| log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) { |
| if (force) { |
| internalDeleteNamespaceForcefully(asyncResponse, authoritative); |
| } else { |
| internalDeleteNamespace(asyncResponse, authoritative); |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) { |
| validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE); |
| validatePoliciesReadOnlyAccess(); |
| |
| // ensure that non-global namespace is directed to the correct cluster |
| if (!namespaceName.isGlobal()) { |
| validateClusterOwnership(namespaceName.getCluster()); |
| } |
| |
| Entry<Policies, Stat> policiesNode1 = null; |
| Policies policies = null; |
| |
| // ensure the local cluster is the only cluster for the global namespace configuration |
| try { |
| policies = namespaceResources().get(path(POLICIES, namespaceName.toString())).orElseThrow( |
| () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist.")); |
| if (namespaceName.isGlobal()) { |
| if (policies.replication_clusters.size() > 1) { |
| // There are still more than one clusters configured for the global namespace |
| throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace " |
| + namespaceName + ". There are still more than one replication clusters configured."); |
| } |
| if (policies.replication_clusters.size() == 1 |
| && !policies.replication_clusters.contains(config().getClusterName())) { |
| // the only replication cluster is other cluster, redirect |
| String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); |
| ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster)) |
| .orElseThrow(() -> new RestException(Status.NOT_FOUND, |
| "Cluster " + replCluster + " does not exist")); |
| URL replClusterUrl; |
| if (!config().isTlsEnabled() || !isRequestHttps()) { |
| replClusterUrl = new URL(replClusterData.getServiceUrl()); |
| } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) { |
| replClusterUrl = new URL(replClusterData.getServiceUrlTls()); |
| } else { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "The replication cluster does not provide TLS encrypted service"); |
| } |
| URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost()) |
| .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect, |
| replCluster); |
| } |
| throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); |
| } |
| } |
| } catch (WebApplicationException wae) { |
| asyncResponse.resume(wae); |
| return; |
| } catch (Exception e) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| boolean isEmpty; |
| List<String> topics; |
| try { |
| topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); |
| topics.addAll(getPartitionedTopicList(TopicDomain.persistent)); |
| topics.addAll(getPartitionedTopicList(TopicDomain.non_persistent)); |
| isEmpty = topics.isEmpty(); |
| |
| } catch (Exception e) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| if (!isEmpty) { |
| if (log.isDebugEnabled()) { |
| log.debug("Found topics on namespace {}", namespaceName); |
| } |
| boolean hasNonSystemTopic = false; |
| for (String topic : topics) { |
| if (!SystemTopicClient.isSystemTopic(TopicName.get(topic))) { |
| hasNonSystemTopic = true; |
| break; |
| } |
| } |
| if (hasNonSystemTopic) { |
| asyncResponse.resume(new RestException(Status.CONFLICT, "Cannot delete non empty namespace")); |
| return; |
| } |
| } |
| |
| // set the policies to deleted so that somebody else cannot acquire this namespace |
| try { |
| namespaceResources().set(path(POLICIES, namespaceName.toString()), (old) -> { |
| old.deleted = true; |
| return old; |
| }); |
| } catch (Exception e) { |
| log.error("[{}] Failed to delete namespace on global ZK {}", clientAppId(), namespaceName, e); |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| // remove from owned namespace map and ephemeral node from ZK |
| final List<CompletableFuture<Void>> futures = Lists.newArrayList(); |
| try { |
| // remove system topics first. |
| if (!topics.isEmpty()) { |
| for (String topic : topics) { |
| pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> { |
| topicOptional.ifPresent(systemTopic -> futures.add(systemTopic.deleteForcefully())); |
| }); |
| } |
| } |
| NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() |
| .getBundles(namespaceName); |
| for (NamespaceBundle bundle : bundles.getBundles()) { |
| // check if the bundle is owned by any broker, if not then we do not need to delete the bundle |
| if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) { |
| futures.add(pulsar().getAdminClient().namespaces() |
| .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange())); |
| } |
| } |
| } catch (Exception e) { |
| log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e); |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| FutureUtil.waitForAll(futures).handle((result, exception) -> { |
| if (exception != null) { |
| if (exception.getCause() instanceof PulsarAdminException) { |
| asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); |
| return null; |
| } else { |
| log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, exception); |
| asyncResponse.resume(new RestException(exception.getCause())); |
| return null; |
| } |
| } |
| |
| try { |
| // we have successfully removed all the ownership for the namespace, the policies znode can be deleted |
| // now |
| final String globalZkPolicyPath = path(POLICIES, namespaceName.toString()); |
| final String localZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString()); |
| namespaceResources().delete(globalZkPolicyPath); |
| try { |
| getLocalPolicies().delete(localZkPolicyPath); |
| } catch (NotFoundException nne) { |
| // If the z-node with the modified information is not there anymore, we're already good |
| } |
| } catch (Exception e) { |
| log.error("[{}] Failed to remove owned namespace {} from ZK", clientAppId(), namespaceName, e); |
| asyncResponse.resume(new RestException(e)); |
| return null; |
| } |
| |
| asyncResponse.resume(Response.noContent().build()); |
| return null; |
| }); |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, boolean authoritative) { |
| validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE); |
| validatePoliciesReadOnlyAccess(); |
| |
| if (!pulsar().getConfiguration().isForceDeleteNamespaceAllowed()) { |
| asyncResponse.resume( |
| new RestException(Status.METHOD_NOT_ALLOWED, "Broker doesn't allow forced deletion of namespaces")); |
| return; |
| } |
| |
| // ensure that non-global namespace is directed to the correct cluster |
| if (!namespaceName.isGlobal()) { |
| validateClusterOwnership(namespaceName.getCluster()); |
| } |
| |
| Policies policies = null; |
| |
| // ensure the local cluster is the only cluster for the global namespace configuration |
| try { |
| policies = namespaceResources().get(path(POLICIES, namespaceName.toString())).orElseThrow( |
| () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist.")); |
| if (namespaceName.isGlobal()) { |
| if (policies.replication_clusters.size() > 1) { |
| // There are still more than one clusters configured for the global namespace |
| throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace " |
| + namespaceName + ". There are still more than one replication clusters configured."); |
| } |
| if (policies.replication_clusters.size() == 1 |
| && !policies.replication_clusters.contains(config().getClusterName())) { |
| // the only replication cluster is other cluster, redirect |
| String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); |
| ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster)) |
| .orElseThrow(() -> new RestException(Status.NOT_FOUND, |
| "Cluster " + replCluster + " does not exist")); |
| URL replClusterUrl; |
| if (!config().isTlsEnabled() || !isRequestHttps()) { |
| replClusterUrl = new URL(replClusterData.getServiceUrl()); |
| } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) { |
| replClusterUrl = new URL(replClusterData.getServiceUrlTls()); |
| } else { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "The replication cluster does not provide TLS encrypted service"); |
| } |
| URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost()) |
| .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect, |
| replCluster); |
| } |
| throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); |
| } |
| } |
| } catch (WebApplicationException wae) { |
| asyncResponse.resume(wae); |
| return; |
| } catch (Exception e) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| List<String> topics; |
| try { |
| topics = pulsar().getNamespaceService().getFullListOfTopics(namespaceName).join(); |
| } catch (Exception e) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| // set the policies to deleted so that somebody else cannot acquire this namespace |
| try { |
| namespaceResources().set(path(POLICIES, namespaceName.toString()), (old) -> { |
| old.deleted = true; |
| return old; |
| }); |
| } catch (Exception e) { |
| log.error("[{}] Failed to delete namespace on global ZK {}", clientAppId(), namespaceName, e); |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| // remove from owned namespace map and ephemeral node from ZK |
| final List<CompletableFuture<Void>> futures = Lists.newArrayList(); |
| try { |
| // firstly remove all topics including system topics |
| if (!topics.isEmpty()) { |
| for (String topic : topics) { |
| try { |
| futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true)); |
| } catch (Exception e) { |
| log.error("[{}] Failed to force delete topic {}", clientAppId(), topic, e); |
| asyncResponse.resume(new RestException(e)); |
| } |
| } |
| } |
| // forcefully delete namespace bundles |
| NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() |
| .getBundles(namespaceName); |
| for (NamespaceBundle bundle : bundles.getBundles()) { |
| // check if the bundle is owned by any broker, if not then we do not need to delete the bundle |
| if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) { |
| futures.add(pulsar().getAdminClient().namespaces() |
| .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange(), true)); |
| } |
| } |
| } catch (Exception e) { |
| log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e); |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| FutureUtil.waitForAll(futures).handle((result, exception) -> { |
| if (exception != null) { |
| if (exception.getCause() instanceof PulsarAdminException) { |
| asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); |
| return null; |
| } else { |
| log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, exception); |
| asyncResponse.resume(new RestException(exception.getCause())); |
| return null; |
| } |
| } |
| |
| try { |
| // remove partitioned topics znode |
| final String globalPartitionedPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString()); |
| // check whether partitioned topics znode exist |
| if (namespaceResources().exists(globalPartitionedPath)) { |
| deleteRecursive(namespaceResources(), globalPartitionedPath); |
| } |
| |
| // we have successfully removed all the ownership for the namespace, the policies znode can be deleted |
| // now |
| final String globalZkPolicyPath = path(POLICIES, namespaceName.toString()); |
| final String localZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString()); |
| namespaceResources().delete(globalZkPolicyPath); |
| |
| try { |
| getLocalPolicies().delete(localZkPolicyPath); |
| } catch (NotFoundException nne) { |
| // If the z-node with the modified information is not there anymore, we're already good |
| } |
| } catch (Exception e) { |
| log.error("[{}] Failed to remove owned namespace {} from ZK", clientAppId(), namespaceName, e); |
| asyncResponse.resume(new RestException(e)); |
| return null; |
| } |
| |
| asyncResponse.resume(Response.noContent().build()); |
| return null; |
| }); |
| } |
| |
| protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) { |
| if (force) { |
| internalDeleteNamespaceBundleForcefully(bundleRange, authoritative); |
| } else { |
| internalDeleteNamespaceBundle(bundleRange, authoritative); |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE); |
| validatePoliciesReadOnlyAccess(); |
| |
| // ensure that non-global namespace is directed to the correct cluster |
| if (!namespaceName.isGlobal()) { |
| validateClusterOwnership(namespaceName.getCluster()); |
| } |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| // ensure the local cluster is the only cluster for the global namespace configuration |
| try { |
| if (namespaceName.isGlobal()) { |
| if (policies.replication_clusters.size() > 1) { |
| // There are still more than one clusters configured for the global namespace |
| throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace " |
| + namespaceName + ". There are still more than one replication clusters configured."); |
| } |
| if (policies.replication_clusters.size() == 1 |
| && !policies.replication_clusters.contains(config().getClusterName())) { |
| // the only replication cluster is other cluster, redirect |
| String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); |
| ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster)) |
| .orElseThrow(() -> new RestException(Status.NOT_FOUND, |
| "Cluster " + replCluster + " does not exist")); |
| URL replClusterUrl; |
| if (!config().isTlsEnabled() || !isRequestHttps()) { |
| replClusterUrl = new URL(replClusterData.getServiceUrl()); |
| } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) { |
| replClusterUrl = new URL(replClusterData.getServiceUrlTls()); |
| } else { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "The replication cluster does not provide TLS encrypted service"); |
| } |
| URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost()) |
| .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Redirecting the rest call to {}: cluster={}", |
| clientAppId(), redirect, replCluster); |
| } |
| throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); |
| } |
| } |
| } catch (WebApplicationException wae) { |
| throw wae; |
| } catch (Exception e) { |
| throw new RestException(e); |
| } |
| |
| try { |
| NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, |
| authoritative, true); |
| List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); |
| for (String topic : topics) { |
| NamespaceBundle topicBundle = pulsar().getNamespaceService() |
| .getBundle(TopicName.get(topic)); |
| if (bundle.equals(topicBundle)) { |
| throw new RestException(Status.CONFLICT, "Cannot delete non empty bundle"); |
| } |
| } |
| |
| // remove from owned namespace map and ephemeral node from ZK |
| pulsar().getNamespaceService().removeOwnedServiceUnit(bundle); |
| } catch (WebApplicationException wae) { |
| throw wae; |
| } catch (Exception e) { |
| log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(), |
| bundleRange, e); |
| throw new RestException(e); |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boolean authoritative) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE); |
| validatePoliciesReadOnlyAccess(); |
| |
| // ensure that non-global namespace is directed to the correct cluster |
| if (!namespaceName.isGlobal()) { |
| validateClusterOwnership(namespaceName.getCluster()); |
| } |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| // ensure the local cluster is the only cluster for the global namespace configuration |
| try { |
| if (namespaceName.isGlobal()) { |
| if (policies.replication_clusters.size() > 1) { |
| // There are still more than one clusters configured for the global namespace |
| throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace " |
| + namespaceName + ". There are still more than one replication clusters configured."); |
| } |
| if (policies.replication_clusters.size() == 1 |
| && !policies.replication_clusters.contains(config().getClusterName())) { |
| // the only replication cluster is other cluster, redirect |
| String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); |
| ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster)) |
| .orElseThrow(() -> new RestException(Status.NOT_FOUND, |
| "Cluster " + replCluster + " does not exist")); |
| URL replClusterUrl; |
| if (!config().isTlsEnabled() || !isRequestHttps()) { |
| replClusterUrl = new URL(replClusterData.getServiceUrl()); |
| } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) { |
| replClusterUrl = new URL(replClusterData.getServiceUrlTls()); |
| } else { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "The replication cluster does not provide TLS encrypted service"); |
| } |
| URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost()) |
| .port(replClusterUrl.getPort()) |
| .replaceQueryParam("authoritative", false).build(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Redirecting the rest call to {}: cluster={}", |
| clientAppId(), redirect, replCluster); |
| } |
| throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); |
| } |
| } |
| } catch (WebApplicationException wae) { |
| throw wae; |
| } catch (Exception e) { |
| throw new RestException(e); |
| } |
| |
| try { |
| NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, |
| authoritative, true); |
| // directly remove from owned namespace map and ephemeral node from ZK |
| pulsar().getNamespaceService().removeOwnedServiceUnit(bundle); |
| } catch (WebApplicationException wae) { |
| throw wae; |
| } catch (Exception e) { |
| log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(), |
| bundleRange, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION); |
| checkNotNull(role, "Role should not be null"); |
| checkNotNull(actions, "Actions should not be null"); |
| |
| try { |
| AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService(); |
| if (null != authService) { |
| authService.grantPermissionAsync(namespaceName, actions, role, null/*additional auth-data json*/) |
| .get(); |
| } else { |
| throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled"); |
| } |
| } catch (InterruptedException e) { |
| log.error("[{}] Failed to get permissions for namespace {}", clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } catch (ExecutionException e) { |
| if (e.getCause() instanceof IllegalArgumentException) { |
| log.warn("[{}] Failed to set permissions for namespace {}: does not exist", clientAppId(), |
| namespaceName, e); |
| throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); |
| } else if (e.getCause() instanceof IllegalStateException) { |
| log.warn("[{}] Failed to set permissions for namespace {}: {}", |
| clientAppId(), namespaceName, e.getCause().getMessage(), e); |
| throw new RestException(Status.CONFLICT, "Concurrent modification"); |
| } else { |
| log.error("[{}] Failed to get permissions for namespace {}", clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| } |
| |
| |
| protected void internalGrantPermissionOnSubscription(String subscription, Set<String> roles) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION); |
| checkNotNull(subscription, "Subscription should not be null"); |
| checkNotNull(roles, "Roles should not be null"); |
| |
| try { |
| AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService(); |
| if (null != authService) { |
| authService.grantSubscriptionPermissionAsync(namespaceName, subscription, roles, |
| null/* additional auth-data json */).get(); |
| } else { |
| throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled"); |
| } |
| } catch (InterruptedException e) { |
| log.error("[{}] Failed to get permissions for namespace {}", clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } catch (ExecutionException e) { |
| if (e.getCause() instanceof IllegalArgumentException) { |
| log.warn("[{}] Failed to set permissions for namespace {}: does not exist", clientAppId(), |
| namespaceName); |
| throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); |
| } else if (e.getCause() instanceof IllegalStateException) { |
| log.warn("[{}] Failed to set permissions for namespace {}: concurrent modification", clientAppId(), |
| namespaceName); |
| throw new RestException(Status.CONFLICT, "Concurrent modification"); |
| } else { |
| log.error("[{}] Failed to get permissions for namespace {}", clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| } |
| |
| protected void internalRevokePermissionsOnNamespace(String role) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION); |
| validatePoliciesReadOnlyAccess(); |
| checkNotNull(role, "Role should not be null"); |
| updatePolicies(path(POLICIES, namespaceName.toString()), policies ->{ |
| policies.auth_policies.namespace_auth.remove(role); |
| return policies; |
| }); |
| } |
| |
| protected void internalRevokePermissionsOnSubscription(String subscriptionName, String role) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION); |
| validatePoliciesReadOnlyAccess(); |
| checkNotNull(subscriptionName, "SubscriptionName should not be null"); |
| checkNotNull(role, "Role should not be null"); |
| |
| AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService(); |
| if (null != authService) { |
| authService.revokeSubscriptionPermissionAsync(namespaceName, subscriptionName, role, |
| null/* additional auth-data json */); |
| } else { |
| throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled"); |
| } |
| } |
| |
| protected Set<String> internalGetNamespaceReplicationClusters() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ); |
| |
| if (!namespaceName.isGlobal()) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "Cannot get the replication clusters for a non-global namespace"); |
| } |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| return policies.replication_clusters; |
| } |
| |
| protected void internalSetNamespaceReplicationClusters(List<String> clusterIds) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| checkNotNull(clusterIds, "ClusterIds should not be null"); |
| |
| Set<String> replicationClusterSet = Sets.newHashSet(clusterIds); |
| if (!namespaceName.isGlobal()) { |
| throw new RestException(Status.PRECONDITION_FAILED, "Cannot set replication on a non-global namespace"); |
| } |
| |
| if (replicationClusterSet.contains("global")) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "Cannot specify global in the list of replication clusters"); |
| } |
| |
| Set<String> clusters = clusters(); |
| for (String clusterId : replicationClusterSet) { |
| if (!clusters.contains(clusterId)) { |
| throw new RestException(Status.FORBIDDEN, "Invalid cluster id: " + clusterId); |
| } |
| validatePeerClusterConflict(clusterId, replicationClusterSet); |
| } |
| for (String clusterId : replicationClusterSet) { |
| if (!clusters.contains(clusterId)) { |
| throw new RestException(Status.FORBIDDEN, "Invalid cluster id: " + clusterId); |
| } |
| validatePeerClusterConflict(clusterId, replicationClusterSet); |
| } |
| |
| for (String clusterId : replicationClusterSet) { |
| validateClusterForTenant(namespaceName.getTenant(), clusterId); |
| } |
| updatePolicies(path(POLICIES, namespaceName.toString()), policies ->{ |
| policies.replication_clusters = replicationClusterSet; |
| return policies; |
| }); |
| } |
| |
| protected void internalSetNamespaceMessageTTL(Integer messageTTL) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.TTL, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| if (messageTTL != null && messageTTL < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL"); |
| } |
| updatePolicies(path(POLICIES, namespaceName.toString()), (policies)->{ |
| policies.message_ttl_in_seconds = messageTTL; |
| return policies; |
| }); |
| } |
| |
| protected void internalSetSubscriptionExpirationTime(int expirationTime) { |
| validateAdminAccessForTenant(namespaceName.getTenant()); |
| validatePoliciesReadOnlyAccess(); |
| |
| if (expirationTime < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for subscription expiration time"); |
| } |
| updatePolicies(path(POLICIES, namespaceName.toString()), (policies) -> { |
| policies.subscription_expiration_time_minutes = expirationTime; |
| return policies; |
| }); |
| } |
| |
| protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, |
| AutoTopicCreationOverride autoTopicCreationOverride) { |
| final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); |
| validateAdminAccessForTenant(namespaceName.getTenant()); |
| validatePoliciesReadOnlyAccess(); |
| if (autoTopicCreationOverride != null) { |
| if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "Invalid configuration for autoTopicCreationOverride"); |
| } |
| if (maxPartitions > 0 && autoTopicCreationOverride.defaultNumPartitions > maxPartitions) { |
| throw new RestException(Status.NOT_ACCEPTABLE, |
| "Number of partitions should be less than or equal to " + maxPartitions); |
| } |
| } |
| // Force to read the data s.t. the watch to the cache content is setup. |
| namespaceResources().setAsync(path(POLICIES, namespaceName.toString()), (policies) -> { |
| policies.autoTopicCreationOverride = autoTopicCreationOverride; |
| return policies; |
| }).thenApply(r -> { |
| String autoOverride = (autoTopicCreationOverride != null |
| && autoTopicCreationOverride.allowAutoTopicCreation) ? "enabled" : "disabled"; |
| log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), |
| autoOverride != null ? autoOverride : "removed", namespaceName); |
| asyncResponse.resume(Response.noContent().build()); |
| return null; |
| }).exceptionally(e -> { |
| log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, |
| e.getCause()); |
| if (e.getCause() instanceof NotFoundException) { |
| asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); |
| return null; |
| } |
| asyncResponse.resume(new RestException(e.getCause())); |
| return null; |
| }); |
| } |
| |
| protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) { |
| internalSetAutoTopicCreation(asyncResponse, null); |
| } |
| |
| protected void internalSetAutoSubscriptionCreation( |
| AsyncResponse asyncResponse, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) { |
| validateAdminAccessForTenant(namespaceName.getTenant()); |
| validatePoliciesReadOnlyAccess(); |
| |
| // Force to read the data s.t. the watch to the cache content is setup. |
| namespaceResources().setAsync(path(POLICIES, namespaceName.toString()), (policies) -> { |
| policies.autoSubscriptionCreationOverride = autoSubscriptionCreationOverride; |
| return policies; |
| }).thenApply(r -> { |
| if (autoSubscriptionCreationOverride != null) { |
| String autoOverride = autoSubscriptionCreationOverride.allowAutoSubscriptionCreation ? "enabled" |
| : "disabled"; |
| log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(), |
| autoOverride != null ? autoOverride : "removed", namespaceName); |
| } |
| asyncResponse.resume(Response.noContent().build()); |
| return null; |
| }).exceptionally(e -> { |
| log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), |
| namespaceName, e.getCause()); |
| if (e.getCause() instanceof NotFoundException) { |
| asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); |
| return null; |
| } |
| asyncResponse.resume(new RestException(e.getCause())); |
| return null; |
| }); |
| } |
| |
| protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncResponse) { |
| internalSetAutoSubscriptionCreation(asyncResponse, null); |
| } |
| |
| protected void internalModifyDeduplication(Boolean enableDeduplication) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| updatePolicies(path(POLICIES, namespaceName.toString()), policies ->{ |
| policies.deduplicationEnabled = enableDeduplication; |
| return policies; |
| }); |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected void internalUnloadNamespace(AsyncResponse asyncResponse) { |
| validateSuperUserAccess(); |
| log.info("[{}] Unloading namespace {}", clientAppId(), namespaceName); |
| |
| if (namespaceName.isGlobal()) { |
| // check cluster ownership for a given global namespace: redirect if peer-cluster owns it |
| validateGlobalNamespaceOwnership(namespaceName); |
| } else { |
| validateClusterOwnership(namespaceName.getCluster()); |
| validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); |
| } |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| |
| final List<CompletableFuture<Void>> futures = Lists.newArrayList(); |
| List<String> boundaries = policies.bundles.getBoundaries(); |
| for (int i = 0; i < boundaries.size() - 1; i++) { |
| String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); |
| try { |
| futures.add(pulsar().getAdminClient().namespaces().unloadNamespaceBundleAsync(namespaceName.toString(), |
| bundle)); |
| } catch (PulsarServerException e) { |
| log.error("[{}] Failed to unload namespace {}", clientAppId(), namespaceName, e); |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| } |
| |
| FutureUtil.waitForAll(futures).handle((result, exception) -> { |
| if (exception != null) { |
| log.error("[{}] Failed to unload namespace {}", clientAppId(), namespaceName, exception); |
| if (exception.getCause() instanceof PulsarAdminException) { |
| asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); |
| return null; |
| } else { |
| asyncResponse.resume(new RestException(exception.getCause())); |
| return null; |
| } |
| } |
| log.info("[{}] Successfully unloaded all the bundles in namespace {}", clientAppId(), namespaceName); |
| asyncResponse.resume(Response.noContent().build()); |
| return null; |
| }); |
| } |
| |
| |
| protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffinityGroup) { |
| validateSuperUserAccess(); |
| log.info("[{}] Setting bookie-affinity-group {} for namespace {}", clientAppId(), bookieAffinityGroup, |
| this.namespaceName); |
| |
| if (namespaceName.isGlobal()) { |
| // check cluster ownership for a given global namespace: redirect if peer-cluster owns it |
| validateGlobalNamespaceOwnership(namespaceName); |
| } else { |
| validateClusterOwnership(namespaceName.getCluster()); |
| validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); |
| } |
| |
| String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString()); |
| try { |
| getLocalPolicies().setWithCreate(path, (oldPolicies) -> { |
| LocalPolicies localPolicies = oldPolicies.orElse(new LocalPolicies()); |
| localPolicies.bookieAffinityGroup = bookieAffinityGroup; |
| log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(), |
| namespaceName, localPolicies); |
| return localPolicies; |
| }); |
| } catch (NotFoundException e) { |
| log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(), |
| namespaceName); |
| throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); |
| } catch (Exception e) { |
| log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName, |
| e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalDeleteBookieAffinityGroup() { |
| internalSetBookieAffinityGroup(null); |
| } |
| |
| protected BookieAffinityGroupData internalGetBookieAffinityGroup() { |
| validateSuperUserAccess(); |
| |
| if (namespaceName.isGlobal()) { |
| // check cluster ownership for a given global namespace: redirect if peer-cluster owns it |
| validateGlobalNamespaceOwnership(namespaceName); |
| } else { |
| validateClusterOwnership(namespaceName.getCluster()); |
| validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); |
| } |
| String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString()); |
| try { |
| final BookieAffinityGroupData bookkeeperAffinityGroup = getLocalPolicies().get(path) |
| .orElseThrow(() -> new RestException(Status.NOT_FOUND, |
| "Namespace local-policies does not exist")).bookieAffinityGroup; |
| if (bookkeeperAffinityGroup == null) { |
| throw new RestException(Status.NOT_FOUND, "bookie-affinity group does not exist"); |
| } |
| return bookkeeperAffinityGroup; |
| } catch (NotFoundException e) { |
| log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", |
| clientAppId(), namespaceName); |
| throw new RestException(Status.NOT_FOUND, "Namespace policies does not exist"); |
| } catch (RestException re) { |
| throw re; |
| } catch (Exception e) { |
| log.error("[{}] Failed to get local-policy configuration for namespace {} at path {}", clientAppId(), |
| namespaceName, path, e); |
| throw new RestException(e); |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| public void internalUnloadNamespaceBundle(AsyncResponse asyncResponse, String bundleRange, boolean authoritative) { |
| validateSuperUserAccess(); |
| checkNotNull(bundleRange, "BundleRange should not be null"); |
| log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| |
| NamespaceBundle bundle = |
| pulsar().getNamespaceService().getNamespaceBundleFactory() |
| .getBundle(namespaceName.toString(), bundleRange); |
| boolean isOwnedByLocalCluster = false; |
| try { |
| isOwnedByLocalCluster = pulsar().getNamespaceService().isNamespaceBundleOwned(bundle).get(); |
| } catch (Exception e) { |
| if (log.isDebugEnabled()) { |
| log.debug("Failed to validate cluster ownership for {}-{}, {}", |
| namespaceName.toString(), bundleRange, e.getMessage(), e); |
| } |
| } |
| |
| // validate namespace ownership only if namespace is not owned by local-cluster (it happens when broker doesn't |
| // receive replication-cluster change watch and still owning bundle |
| if (!isOwnedByLocalCluster) { |
| if (namespaceName.isGlobal()) { |
| // check cluster ownership for a given global namespace: redirect if peer-cluster owns it |
| validateGlobalNamespaceOwnership(namespaceName); |
| } else { |
| validateClusterOwnership(namespaceName.getCluster()); |
| validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); |
| } |
| } |
| |
| validatePoliciesReadOnlyAccess(); |
| |
| isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange).thenAccept(flag -> { |
| if (!flag) { |
| log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), namespaceName, |
| bundleRange); |
| asyncResponse.resume(Response.noContent().build()); |
| return; |
| } |
| NamespaceBundle nsBundle; |
| |
| try { |
| nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, |
| authoritative, true); |
| } catch (WebApplicationException wae) { |
| asyncResponse.resume(wae); |
| return; |
| } |
| |
| pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle) |
| .thenRun(() -> { |
| log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), nsBundle.toString()); |
| asyncResponse.resume(Response.noContent().build()); |
| }).exceptionally(ex -> { |
| log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, |
| ex); |
| asyncResponse.resume(new RestException(ex)); |
| return null; |
| }); |
| }).exceptionally((ex) -> { |
| if (ex.getCause() instanceof WebApplicationException) { |
| asyncResponse.resume(ex.getCause()); |
| } else { |
| asyncResponse.resume(new RestException(ex.getCause())); |
| } |
| return null; |
| }); |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected void internalSplitNamespaceBundle(String bundleRange, |
| boolean authoritative, boolean unload, String splitAlgorithmName) { |
| validateSuperUserAccess(); |
| checkNotNull(bundleRange, "BundleRange should not be null"); |
| log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| |
| if (namespaceName.isGlobal()) { |
| // check cluster ownership for a given global namespace: redirect if peer-cluster owns it |
| validateGlobalNamespaceOwnership(namespaceName); |
| } else { |
| validateClusterOwnership(namespaceName.getCluster()); |
| validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); |
| } |
| |
| validatePoliciesReadOnlyAccess(); |
| |
| List<String> supportedNamespaceBundleSplitAlgorithms = |
| pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms(); |
| if (StringUtils.isNotBlank(splitAlgorithmName) |
| && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "Unsupported namespace bundle split algorithm, supported algorithms are " |
| + supportedNamespaceBundleSplitAlgorithms); |
| } |
| |
| try { |
| NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, |
| authoritative, true); |
| pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, |
| getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get(); |
| log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString()); |
| } catch (WebApplicationException wae) { |
| throw wae; |
| } catch (ExecutionException e) { |
| if (e.getCause() instanceof IllegalArgumentException) { |
| log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName, |
| bundleRange, e.getMessage()); |
| throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request"); |
| } else { |
| log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e); |
| throw new RestException(e.getCause()); |
| } |
| } catch (Exception e) { |
| log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e); |
| throw new RestException(e); |
| } |
| } |
| |
| private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { |
| NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName); |
| if (algorithm == null) { |
| algorithm = NamespaceBundleSplitAlgorithm.of( |
| pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm()); |
| } |
| if (algorithm == null) { |
| algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO; |
| } |
| return algorithm; |
| } |
| |
| protected void internalSetPublishRate(PublishRate maxPublishMessageRate) { |
| validateSuperUserAccess(); |
| log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(), maxPublishMessageRate); |
| return policies; |
| }); |
| log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}", clientAppId(), |
| namespaceName); |
| } |
| |
| protected void internalRemovePublishRate() { |
| validateSuperUserAccess(); |
| log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName, topicName); |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| if (policies.publishMaxMessageRate != null) { |
| policies.publishMaxMessageRate.remove(pulsar().getConfiguration().getClusterName()); |
| } |
| return policies; |
| }); |
| log.info("[{}] Successfully remove the publish_max_message_rate for cluster on namespace {}", clientAppId(), |
| namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to remove the publish_max_message_rate for cluster on namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected PublishRate internalGetPublishRate() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| PublishRate publishRate = policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName()); |
| if (publishRate != null) { |
| return publishRate; |
| } else { |
| throw new RestException(Status.NOT_FOUND, |
| "Publish-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName()); |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) { |
| validateSuperUserAccess(); |
| log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); |
| |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies)->{ |
| policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); |
| policies.clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); |
| return policies; |
| }); |
| log.info("[{}] Successfully updated the dispatchRate for cluster on namespace {}", clientAppId(), |
| namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to update the dispatchRate for cluster on namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalDeleteTopicDispatchRate() { |
| validateSuperUserAccess(); |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.topicDispatchRate.remove(pulsar().getConfiguration().getClusterName()); |
| policies.clusterDispatchRate.remove(pulsar().getConfiguration().getClusterName()); |
| return policies; |
| }); |
| log.info("[{}] Successfully delete the dispatchRate for cluster on namespace {}", clientAppId(), |
| namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to delete the dispatchRate for cluster on namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected DispatchRate internalGetTopicDispatchRate() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| return policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName()); |
| } |
| |
| protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) { |
| validateSuperUserAccess(); |
| log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); |
| |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); |
| return policies; |
| }); |
| log.info("[{}] Successfully updated the subscriptionDispatchRate for cluster on namespace {}", |
| clientAppId(), namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalDeleteSubscriptionDispatchRate() { |
| validateSuperUserAccess(); |
| |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.subscriptionDispatchRate.remove(pulsar().getConfiguration().getClusterName()); |
| return policies; |
| }); |
| log.info("[{}] Successfully delete the subscriptionDispatchRate for cluster on namespace {}", |
| clientAppId(), namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to delete the subscriptionDispatchRate for cluster on namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected DispatchRate internalGetSubscriptionDispatchRate() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName()); |
| } |
| |
| protected void internalSetSubscribeRate(SubscribeRate subscribeRate) { |
| validateSuperUserAccess(); |
| log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate); |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate); |
| return policies; |
| }); |
| log.info("[{}] Successfully updated the subscribeRate for cluster on namespace {}", clientAppId(), |
| namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to update the subscribeRate for cluster on namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalDeleteSubscribeRate() { |
| validateSuperUserAccess(); |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName()); |
| return policies; |
| }); |
| log.info("[{}] Successfully delete the subscribeRate for cluster on namespace {}", clientAppId(), |
| namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to delete the subscribeRate for cluster on namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected SubscribeRate internalGetSubscribeRate() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ); |
| Policies policies = getNamespacePolicies(namespaceName); |
| return policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName()); |
| } |
| |
| protected void internalRemoveReplicatorDispatchRate() { |
| validateSuperUserAccess(); |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.replicatorDispatchRate.remove(pulsar().getConfiguration().getClusterName()); |
| return policies; |
| }); |
| log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(), |
| namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) { |
| validateSuperUserAccess(); |
| log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); |
| return policies; |
| }); |
| log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(), |
| namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected DispatchRate internalGetReplicatorDispatchRate() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| return policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName()); |
| } |
| |
| protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType |
| : BacklogQuotaType.destination_storage; |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| Policies policies = namespaceResources().get(path) |
| .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace policies does not exist")); |
| RetentionPolicies r = policies.retention_policies; |
| if (r != null) { |
| Policies p = new Policies(); |
| p.backlog_quota_map.put(quotaType, backlogQuota); |
| if (!checkQuotas(p, r)) { |
| log.warn( |
| "[{}] Failed to update backlog configuration" |
| + " for namespace {}: conflicts with retention quota", |
| clientAppId(), namespaceName); |
| new RestException(Status.PRECONDITION_FAILED, |
| "Backlog Quota exceeds configured retention quota for namespace." |
| + " Please increase retention quota and retry"); |
| } |
| } |
| policies.backlog_quota_map.put(quotaType, backlogQuota); |
| namespaceResources().set(path, p -> policies); |
| log.info("[{}] Successfully updated backlog quota map: namespace={}, map={}", clientAppId(), namespaceName, |
| jsonMapper().writeValueAsString(backlogQuota)); |
| |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update backlog quota map for namespace {}", clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalRemoveBacklogQuota(BacklogQuotaType backlogQuotaType) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType |
| : BacklogQuotaType.destination_storage; |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.backlog_quota_map.remove(quotaType); |
| return policies; |
| }); |
| log.info("[{}] Successfully removed backlog namespace={}, quota={}", clientAppId(), namespaceName, |
| backlogQuotaType); |
| } catch (Exception e) { |
| log.error("[{}] Failed to update backlog quota map for namespace {}", clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalSetRetention(RetentionPolicies retention) { |
| validateRetentionPolicies(retention); |
| validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| Policies policies = namespaceResources().get(path).orElseThrow(() -> new RestException(Status.NOT_FOUND, |
| "Namespace policies does not exist")); |
| if (!checkQuotas(policies, retention)) { |
| log.warn("[{}] Failed to update retention configuration" |
| + " for namespace {}: conflicts with backlog quota", |
| clientAppId(), namespaceName); |
| new RestException(Status.PRECONDITION_FAILED, |
| "Retention Quota must exceed configured backlog quota for namespace."); |
| } |
| policies.retention_policies = retention; |
| namespaceResources().set(path, p -> policies); |
| log.info("[{}] Successfully updated retention configuration: namespace={}, map={}", clientAppId(), |
| namespaceName, jsonMapper().writeValueAsString(retention)); |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update retention configuration for namespace {}", clientAppId(), namespaceName, |
| e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalDeletePersistence() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| doUpdatePersistence(null); |
| } |
| |
| protected void internalSetPersistence(PersistencePolicies persistence) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| validatePersistencePolicies(persistence); |
| |
| doUpdatePersistence(persistence); |
| } |
| |
| private void doUpdatePersistence(PersistencePolicies persistence) { |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies)->{ |
| policies.persistence = persistence; |
| return policies; |
| }); |
| log.info("[{}] Successfully updated persistence configuration: namespace={}, map={}", clientAppId(), |
| namespaceName, jsonMapper().writeValueAsString(persistence)); |
| } catch (Exception e) { |
| log.error("[{}] Failed to update persistence configuration for namespace {}", clientAppId(), namespaceName, |
| e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected PersistencePolicies internalGetPersistence() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| return policies.persistence; |
| } |
| |
| protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); |
| |
| final List<CompletableFuture<Void>> futures = Lists.newArrayList(); |
| try { |
| NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() |
| .getBundles(namespaceName); |
| for (NamespaceBundle nsBundle : bundles.getBundles()) { |
| // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear |
| if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) { |
| futures.add(pulsar().getAdminClient().namespaces() |
| .clearNamespaceBundleBacklogAsync(namespaceName.toString(), nsBundle.getBundleRange())); |
| } |
| } |
| } catch (WebApplicationException wae) { |
| asyncResponse.resume(wae); |
| return; |
| } catch (Exception e) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| FutureUtil.waitForAll(futures).handle((result, exception) -> { |
| if (exception != null) { |
| log.warn("[{}] Failed to clear backlog on the bundles for namespace {}: {}", clientAppId(), |
| namespaceName, exception.getCause().getMessage()); |
| if (exception.getCause() instanceof PulsarAdminException) { |
| asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); |
| return null; |
| } else { |
| asyncResponse.resume(new RestException(exception.getCause())); |
| return null; |
| } |
| } |
| log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", clientAppId(), |
| namespaceName); |
| asyncResponse.resume(Response.noContent().build()); |
| return null; |
| }); |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); |
| checkNotNull(bundleRange, "BundleRange should not be null"); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| |
| if (namespaceName.isGlobal()) { |
| // check cluster ownership for a given global namespace: redirect if peer-cluster owns it |
| validateGlobalNamespaceOwnership(namespaceName); |
| } else { |
| validateClusterOwnership(namespaceName.getCluster()); |
| validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); |
| } |
| |
| validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); |
| |
| clearBacklog(namespaceName, bundleRange, null); |
| log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", clientAppId(), namespaceName, |
| bundleRange); |
| } |
| |
| protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription, |
| boolean authoritative) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); |
| checkNotNull(subscription, "Subscription should not be null"); |
| |
| final List<CompletableFuture<Void>> futures = Lists.newArrayList(); |
| try { |
| NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() |
| .getBundles(namespaceName); |
| for (NamespaceBundle nsBundle : bundles.getBundles()) { |
| // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear |
| if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) { |
| futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync( |
| namespaceName.toString(), nsBundle.getBundleRange(), subscription)); |
| } |
| } |
| } catch (WebApplicationException wae) { |
| asyncResponse.resume(wae); |
| return; |
| } catch (Exception e) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| FutureUtil.waitForAll(futures).handle((result, exception) -> { |
| if (exception != null) { |
| log.warn("[{}] Failed to clear backlog for subscription {} on the bundles for namespace {}: {}", |
| clientAppId(), subscription, namespaceName, exception.getCause().getMessage()); |
| if (exception.getCause() instanceof PulsarAdminException) { |
| asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); |
| return null; |
| } else { |
| asyncResponse.resume(new RestException(exception.getCause())); |
| return null; |
| } |
| } |
| log.info("[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", |
| clientAppId(), subscription, namespaceName); |
| asyncResponse.resume(Response.noContent().build()); |
| return null; |
| }); |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange, |
| boolean authoritative) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); |
| checkNotNull(subscription, "Subscription should not be null"); |
| checkNotNull(bundleRange, "BundleRange should not be null"); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| |
| if (namespaceName.isGlobal()) { |
| // check cluster ownership for a given global namespace: redirect if peer-cluster owns it |
| validateGlobalNamespaceOwnership(namespaceName); |
| } else { |
| validateClusterOwnership(namespaceName.getCluster()); |
| validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); |
| } |
| |
| validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); |
| |
| clearBacklog(namespaceName, bundleRange, subscription); |
| log.info("[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", clientAppId(), |
| subscription, namespaceName, bundleRange); |
| } |
| |
| protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription, |
| boolean authoritative) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE); |
| checkNotNull(subscription, "Subscription should not be null"); |
| |
| final List<CompletableFuture<Void>> futures = Lists.newArrayList(); |
| try { |
| NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() |
| .getBundles(namespaceName); |
| for (NamespaceBundle nsBundle : bundles.getBundles()) { |
| // check if the bundle is owned by any broker, if not then there are no subscriptions |
| if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) { |
| futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync( |
| namespaceName.toString(), nsBundle.getBundleRange(), subscription)); |
| } |
| } |
| } catch (WebApplicationException wae) { |
| asyncResponse.resume(wae); |
| return; |
| } catch (Exception e) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| FutureUtil.waitForAll(futures).handle((result, exception) -> { |
| if (exception != null) { |
| log.warn("[{}] Failed to unsubscribe {} on the bundles for namespace {}: {}", clientAppId(), |
| subscription, namespaceName, exception.getCause().getMessage()); |
| if (exception.getCause() instanceof PulsarAdminException) { |
| asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); |
| return null; |
| } else { |
| asyncResponse.resume(new RestException(exception.getCause())); |
| return null; |
| } |
| } |
| log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}", clientAppId(), |
| subscription, namespaceName); |
| asyncResponse.resume(Response.noContent().build()); |
| return null; |
| }); |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) { |
| validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE); |
| checkNotNull(subscription, "Subscription should not be null"); |
| checkNotNull(bundleRange, "BundleRange should not be null"); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| |
| if (namespaceName.isGlobal()) { |
| // check cluster ownership for a given global namespace: redirect if peer-cluster owns it |
| validateGlobalNamespaceOwnership(namespaceName); |
| } else { |
| validateClusterOwnership(namespaceName.getCluster()); |
| validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); |
| } |
| |
| validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); |
| |
| unsubscribe(namespaceName, bundleRange, subscription); |
| log.info("[{}] Successfully unsubscribed {} on namespace bundle {}/{}", clientAppId(), subscription, |
| namespaceName, bundleRange); |
| } |
| |
| protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscriptionAuthMode) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| SubscriptionAuthMode authMode = subscriptionAuthMode == null ? subscriptionAuthMode = SubscriptionAuthMode.None |
| : subscriptionAuthMode; |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.subscription_auth_mode = authMode; |
| return policies; |
| }); |
| log.info("[{}] Successfully updated subscription auth mode: namespace={}, map={}", clientAppId(), |
| namespaceName, jsonMapper().writeValueAsString(authMode)); |
| |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update subscription auth mode for namespace {}", clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalModifyEncryptionRequired(boolean encryptionRequired) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| try { |
| updatePolicies(path(POLICIES, namespaceName.toString()), (policies) -> { |
| policies.encryption_required = encryptionRequired; |
| return policies; |
| }); |
| log.info("[{}] Successfully {} on namespace {}", clientAppId(), encryptionRequired ? "true" : "false", |
| namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to modify encryption required status on namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected DelayedDeliveryPolicies internalGetDelayedDelivery() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).delayed_delivery_policies; |
| } |
| |
| protected InactiveTopicPolicies internalGetInactiveTopic() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| return policies.inactive_topic_policies; |
| } |
| |
| protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies) { |
| validateSuperUserAccess(); |
| validatePoliciesReadOnlyAccess(); |
| internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies); |
| } |
| |
| protected void internalSetPolicies(String fieldName, Object value) { |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| Policies policies = namespaceResources().get(path).orElseThrow(() -> new RestException(Status.NOT_FOUND, |
| "Namespace policies does not exist")); |
| Field field = Policies.class.getDeclaredField(fieldName); |
| field.setAccessible(true); |
| field.set(policies, value); |
| namespaceResources().set(path, p -> policies); |
| log.info("[{}] Successfully updated {} configuration: namespace={}, value={}", clientAppId(), fieldName, |
| namespaceName, jsonMapper().writeValueAsString(value)); |
| |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update {} configuration for namespace {}", clientAppId(), fieldName |
| , namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) { |
| validateSuperUserAccess(); |
| validatePoliciesReadOnlyAccess(); |
| internalSetPolicies("delayed_delivery_policies", delayedDeliveryPolicies); |
| } |
| |
| protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE); |
| checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null"); |
| validatePoliciesReadOnlyAccess(); |
| |
| log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName); |
| |
| if (isBlank(antiAffinityGroup)) { |
| throw new RestException(Status.PRECONDITION_FAILED, "antiAffinityGroup can't be empty"); |
| } |
| |
| try { |
| String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString()); |
| getLocalPolicies().setWithCreate(path, (lp)->{ |
| LocalPolicies localPolicies = lp.orElse(new LocalPolicies()); |
| localPolicies.namespaceAntiAffinityGroup = antiAffinityGroup; |
| return localPolicies; |
| }); |
| log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(), |
| namespaceName, antiAffinityGroup); |
| } catch (Exception e) { |
| log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName, |
| e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected String internalGetNamespaceAntiAffinityGroup() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ); |
| |
| try { |
| return getLocalPolicies() |
| .get(AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString())) |
| .orElse(new LocalPolicies()).namespaceAntiAffinityGroup; |
| } catch (Exception e) { |
| log.error("[{}] Failed to get the antiAffinityGroup of namespace {}", clientAppId(), namespaceName, e); |
| throw new RestException(Status.NOT_FOUND, "Couldn't find namespace policies"); |
| } |
| } |
| |
| protected void internalRemoveNamespaceAntiAffinityGroup() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), namespaceName); |
| |
| try { |
| final String path = joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString()); |
| getLocalPolicies().set(path, (policies)->{ |
| policies.namespaceAntiAffinityGroup = null; |
| return policies; |
| }); |
| log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName); |
| } catch (Exception e) { |
| log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup, |
| String tenant) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ); |
| checkNotNull(cluster, "Cluster should not be null"); |
| checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null"); |
| checkNotNull(tenant, "Tenant should not be null"); |
| |
| log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster); |
| |
| if (isBlank(antiAffinityGroup)) { |
| throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity group can't be empty."); |
| } |
| validateClusterExists(cluster); |
| |
| try { |
| List<String> namespaces = getListOfNamespaces(tenant); |
| |
| return namespaces.stream().filter(ns -> { |
| Optional<LocalPolicies> policies; |
| try { |
| policies = getLocalPolicies() |
| .get(AdminResource.joinPath(LOCAL_POLICIES_ROOT, ns)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| |
| String storedAntiAffinityGroup = policies.orElse(new LocalPolicies()).namespaceAntiAffinityGroup; |
| return antiAffinityGroup.equalsIgnoreCase(storedAntiAffinityGroup); |
| }).collect(Collectors.toList()); |
| |
| } catch (Exception e) { |
| log.warn("Failed to list of properties/namespace from global-zk", e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected RetentionPolicies internalGetRetention() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.READ); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| return policies.retention_policies; |
| } |
| |
| private boolean checkQuotas(Policies policies, RetentionPolicies retention) { |
| Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap = policies.backlog_quota_map; |
| if (backlogQuotaMap.isEmpty()) { |
| return true; |
| } |
| BacklogQuota quota = backlogQuotaMap.get(BacklogQuotaType.destination_storage); |
| return checkBacklogQuota(quota, retention); |
| } |
| |
| private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) { |
| try { |
| List<Topic> topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(), |
| nsName.toString() + "/" + bundleRange); |
| |
| List<CompletableFuture<Void>> futures = Lists.newArrayList(); |
| if (subscription != null) { |
| if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { |
| subscription = PersistentReplicator.getRemoteCluster(subscription); |
| } |
| for (Topic topic : topicList) { |
| if (topic instanceof PersistentTopic |
| && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) { |
| futures.add(((PersistentTopic) topic).clearBacklog(subscription)); |
| } |
| } |
| } else { |
| for (Topic topic : topicList) { |
| if (topic instanceof PersistentTopic |
| && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) { |
| futures.add(((PersistentTopic) topic).clearBacklog()); |
| } |
| } |
| } |
| |
| FutureUtil.waitForAll(futures).get(); |
| } catch (Exception e) { |
| log.error("[{}] Failed to clear backlog for namespace {}/{}, subscription: {}", clientAppId(), |
| nsName.toString(), bundleRange, subscription, e); |
| throw new RestException(e); |
| } |
| } |
| |
| private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) { |
| try { |
| List<Topic> topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(), |
| nsName.toString() + "/" + bundleRange); |
| List<CompletableFuture<Void>> futures = Lists.newArrayList(); |
| if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { |
| throw new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor"); |
| } else { |
| for (Topic topic : topicList) { |
| Subscription sub = topic.getSubscription(subscription); |
| if (sub != null) { |
| futures.add(sub.delete()); |
| } |
| } |
| } |
| FutureUtil.waitForAll(futures).get(); |
| } catch (RestException re) { |
| throw re; |
| } catch (Exception e) { |
| log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", clientAppId(), subscription, |
| nsName.toString(), bundleRange, e); |
| if (e.getCause() instanceof SubscriptionBusyException) { |
| throw new RestException(Status.PRECONDITION_FAILED, "Subscription has active connected consumers"); |
| } |
| throw new RestException(e.getCause()); |
| } |
| } |
| |
| /** |
| * It validates that peer-clusters can't coexist in replication-clusters. |
| * |
| * @param clusterName: given cluster whose peer-clusters can't be present into replication-cluster list |
| * @param replicationClusters: replication-cluster list |
| */ |
| private void validatePeerClusterConflict(String clusterName, Set<String> replicationClusters) { |
| try { |
| ClusterData clusterData = clusterResources().get(path("clusters", clusterName)).orElseThrow( |
| () -> new RestException(Status.PRECONDITION_FAILED, "Invalid replication cluster " + clusterName)); |
| Set<String> peerClusters = clusterData.getPeerClusterNames(); |
| if (peerClusters != null && !peerClusters.isEmpty()) { |
| SetView<String> conflictPeerClusters = Sets.intersection(peerClusters, replicationClusters); |
| if (!conflictPeerClusters.isEmpty()) { |
| log.warn("[{}] {}'s peer cluster can't be part of replication clusters {}", clientAppId(), |
| clusterName, conflictPeerClusters); |
| throw new RestException(Status.CONFLICT, |
| String.format("%s's peer-clusters %s can't be part of replication-clusters %s", clusterName, |
| conflictPeerClusters, replicationClusters)); |
| } |
| } |
| } catch (RestException re) { |
| throw re; |
| } catch (Exception e) { |
| log.warn("[{}] Failed to get cluster-data for {}", clientAppId(), clusterName, e); |
| } |
| } |
| |
| protected BundlesData validateBundlesData(BundlesData initialBundles) { |
| SortedSet<String> partitions = new TreeSet<String>(); |
| for (String partition : initialBundles.getBoundaries()) { |
| Long partBoundary = Long.decode(partition); |
| partitions.add(String.format("0x%08x", partBoundary)); |
| } |
| if (partitions.size() != initialBundles.getBoundaries().size()) { |
| if (log.isDebugEnabled()) { |
| log.debug("Input bundles included repeated partition points. Ignored."); |
| } |
| } |
| try { |
| NamespaceBundleFactory.validateFullRange(partitions); |
| } catch (IllegalArgumentException iae) { |
| throw new RestException(Status.BAD_REQUEST, "Input bundles do not cover the whole hash range. first:" |
| + partitions.first() + ", last:" + partitions.last()); |
| } |
| List<String> bundles = Lists.newArrayList(); |
| bundles.addAll(partitions); |
| return new BundlesData(bundles); |
| } |
| |
| private void validatePolicies(NamespaceName ns, Policies policies) { |
| if (ns.isV2() && policies.replication_clusters.isEmpty()) { |
| // Default to local cluster |
| policies.replication_clusters = Collections.singleton(config().getClusterName()); |
| } |
| |
| // Validate cluster names and permissions |
| policies.replication_clusters.forEach(cluster -> validateClusterForTenant(ns.getTenant(), cluster)); |
| |
| if (policies.message_ttl_in_seconds != null && policies.message_ttl_in_seconds < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL"); |
| } |
| |
| if (policies.bundles != null && policies.bundles.getNumBundles() > 0) { |
| if (policies.bundles.getBoundaries() == null || policies.bundles.getBoundaries().size() == 0) { |
| policies.bundles = getBundles(policies.bundles.getNumBundles()); |
| } else { |
| policies.bundles = validateBundlesData(policies.bundles); |
| } |
| } else { |
| int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles(); |
| policies.bundles = getBundles(defaultNumberOfBundles); |
| } |
| |
| if (policies.persistence != null) { |
| validatePersistencePolicies(policies.persistence); |
| } |
| |
| if (policies.retention_policies != null) { |
| validateRetentionPolicies(policies.retention_policies); |
| } |
| } |
| |
| protected void validateRetentionPolicies(RetentionPolicies retention) { |
| if (retention == null) { |
| return; |
| } |
| checkArgument(retention.getRetentionSizeInMB() >= -1, |
| "Invalid retention policy: size limit must be >= -1"); |
| checkArgument(retention.getRetentionTimeInMinutes() >= -1, |
| "Invalid retention policy: time limit must be >= -1"); |
| checkArgument((retention.getRetentionTimeInMinutes() != 0 && retention.getRetentionSizeInMB() != 0) |
| || (retention.getRetentionTimeInMinutes() == 0 && retention.getRetentionSizeInMB() == 0), |
| "Invalid retention policy: Setting a single time or size limit to 0 is invalid when " |
| + "one of the limits has a non-zero value. Use the value of -1 instead of 0 to ignore a " |
| + "specific limit. To disable retention both limits must be set to 0."); |
| } |
| |
| protected Integer internalGetMaxProducersPerTopic() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).max_producers_per_topic; |
| } |
| |
| protected Integer internalGetDeduplicationSnapshotInterval() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).deduplicationSnapshotIntervalSeconds; |
| } |
| |
| protected void internalSetDeduplicationSnapshotInterval(Integer interval) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE); |
| if (interval != null && interval < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, "interval must be greater than or equal to 0"); |
| } |
| internalSetPolicies("deduplicationSnapshotIntervalSeconds", interval); |
| } |
| |
| protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| try { |
| if (maxProducersPerTopic != null && maxProducersPerTopic < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "maxProducersPerTopic must be 0 or more"); |
| } |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies)->{ |
| policies.max_producers_per_topic = maxProducersPerTopic; |
| return policies; |
| }); |
| log.info("[{}] Successfully updated maxProducersPerTopic configuration: namespace={}, value={}", |
| clientAppId(), namespaceName, maxProducersPerTopic); |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update maxProducersPerTopic configuration for namespace {}", |
| clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected Boolean internalGetDeduplication() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).deduplicationEnabled; |
| } |
| |
| protected Integer internalGetMaxConsumersPerTopic() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).max_consumers_per_topic; |
| } |
| |
| protected void internalSetMaxConsumersPerTopic(Integer maxConsumersPerTopic) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| try { |
| if (maxConsumersPerTopic != null && maxConsumersPerTopic < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, "maxConsumersPerTopic must be 0 or more"); |
| } |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.max_consumers_per_topic = maxConsumersPerTopic; |
| return policies; |
| }); |
| log.info("[{}] Successfully updated maxConsumersPerTopic configuration: namespace={}, value={}", |
| clientAppId(), namespaceName, maxConsumersPerTopic); |
| |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}", |
| clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected int internalGetMaxConsumersPerSubscription() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).max_consumers_per_subscription; |
| } |
| |
| protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| try { |
| if (maxConsumersPerSubscription < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "maxConsumersPerSubscription must be 0 or more"); |
| } |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.max_consumers_per_subscription = maxConsumersPerSubscription; |
| return policies; |
| }); |
| log.info("[{}] Successfully updated maxConsumersPerSubscription configuration: namespace={}, value={}", |
| clientAppId(), namespaceName, maxConsumersPerSubscription); |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}", |
| clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected Integer internalGetMaxUnackedMessagesPerConsumer() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer; |
| } |
| |
| protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessagesPerConsumer) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| if (maxUnackedMessagesPerConsumer != null && maxUnackedMessagesPerConsumer < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "maxUnackedMessagesPerConsumer must be 0 or more"); |
| } |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies)->{ |
| policies.max_unacked_messages_per_consumer = maxUnackedMessagesPerConsumer; |
| return policies; |
| }); |
| log.info("[{}] Successfully updated maxUnackedMessagesPerConsumer configuration: namespace={}, value={}", |
| clientAppId(), namespaceName, maxUnackedMessagesPerConsumer); |
| |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}", |
| clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected Integer internalGetMaxUnackedMessagesPerSubscription() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription; |
| } |
| |
| protected Integer internalGetMaxSubscriptionsPerTopic() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).max_subscriptions_per_topic; |
| } |
| |
| protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ |
| validateSuperUserAccess(); |
| validatePoliciesReadOnlyAccess(); |
| if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "maxSubscriptionsPerTopic must be 0 or more"); |
| } |
| internalSetPolicies("max_subscriptions_per_topic", maxSubscriptionsPerTopic); |
| } |
| |
| protected void internalSetMaxUnackedMessagesPerSubscription(Integer maxUnackedMessagesPerSubscription) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| if (maxUnackedMessagesPerSubscription != null && maxUnackedMessagesPerSubscription < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "maxUnackedMessagesPerSubscription must be 0 or more"); |
| } |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.max_unacked_messages_per_subscription = maxUnackedMessagesPerSubscription; |
| return policies; |
| }); |
| log.info("[{}] Successfully updated maxUnackedMessagesPerSubscription" |
| + " configuration: namespace={}, value={}", |
| clientAppId(), namespaceName, maxUnackedMessagesPerSubscription); |
| |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}", |
| clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected Long internalGetCompactionThreshold() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).compaction_threshold; |
| } |
| |
| protected void internalSetCompactionThreshold(Long newThreshold) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| try { |
| if (newThreshold != null && newThreshold < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "compactionThreshold must be 0 or more"); |
| } |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| policies.compaction_threshold = newThreshold; |
| return policies; |
| }); |
| log.info("[{}] Successfully updated compactionThreshold configuration: namespace={}, value={}", |
| clientAppId(), namespaceName, newThreshold); |
| |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update compactionThreshold configuration for namespace {}", |
| clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected long internalGetOffloadThreshold() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ); |
| Policies policies = getNamespacePolicies(namespaceName); |
| if (policies.offload_policies == null) { |
| return policies.offload_threshold; |
| } else { |
| return policies.offload_policies.getManagedLedgerOffloadThresholdInBytes(); |
| } |
| } |
| |
| protected void internalSetOffloadThreshold(long newThreshold) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| if (policies.offload_policies == null) { |
| policies.offload_policies = new OffloadPolicies(); |
| } |
| policies.offload_policies.setManagedLedgerOffloadThresholdInBytes(newThreshold); |
| policies.offload_threshold = newThreshold; |
| return policies; |
| }); |
| log.info("[{}] Successfully updated offloadThreshold configuration: namespace={}, value={}", |
| clientAppId(), namespaceName, newThreshold); |
| |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update offloadThreshold configuration for namespace {}", |
| clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected Long internalGetOffloadDeletionLag() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ); |
| Policies policies = getNamespacePolicies(namespaceName); |
| if (policies.offload_policies == null) { |
| return policies.offload_deletion_lag_ms; |
| } else { |
| return policies.offload_policies.getManagedLedgerOffloadDeletionLagInMillis(); |
| } |
| } |
| |
| protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| updatePolicies(path, (policies) -> { |
| if (policies.offload_policies == null) { |
| policies.offload_policies = new OffloadPolicies(); |
| } |
| policies.offload_policies.setManagedLedgerOffloadDeletionLagInMillis(newDeletionLagMs); |
| policies.offload_deletion_lag_ms = newDeletionLagMs; |
| return policies; |
| }); |
| log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={}, value={}", |
| clientAppId(), namespaceName, newDeletionLagMs); |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update offloadDeletionLag configuration for namespace {}", clientAppId(), |
| namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| @Deprecated |
| protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompatibilityStrategy() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, |
| PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy; |
| } |
| |
| protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, |
| PolicyOperation.READ); |
| Policies policies = getNamespacePolicies(namespaceName); |
| SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; |
| if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { |
| schemaCompatibilityStrategy = SchemaCompatibilityStrategy |
| .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); |
| } |
| return schemaCompatibilityStrategy; |
| } |
| |
| @Deprecated |
| protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, |
| PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| mutatePolicy((policies) -> { |
| policies.schema_auto_update_compatibility_strategy = strategy; |
| return policies; |
| }, (policies) -> policies.schema_auto_update_compatibility_strategy, |
| "schemaAutoUpdateCompatibilityStrategy"); |
| } |
| |
| protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, |
| PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| mutatePolicy((policies) -> { |
| policies.schema_compatibility_strategy = strategy; |
| return policies; |
| }, (policies) -> policies.schema_compatibility_strategy, |
| "schemaCompatibilityStrategy"); |
| } |
| |
| protected boolean internalGetSchemaValidationEnforced() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, |
| PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).schema_validation_enforced; |
| } |
| |
| protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, |
| PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| mutatePolicy((policies) -> { |
| policies.schema_validation_enforced = schemaValidationEnforced; |
| return policies; |
| }, (policies) -> policies.schema_validation_enforced, |
| "schemaValidationEnforced"); |
| } |
| |
| protected boolean internalGetIsAllowAutoUpdateSchema() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, |
| PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).is_allow_auto_update_schema; |
| } |
| |
| protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchema) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, |
| PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| mutatePolicy((policies) -> { |
| policies.is_allow_auto_update_schema = isAllowAutoUpdateSchema; |
| return policies; |
| }, (policies) -> policies.is_allow_auto_update_schema, |
| "isAllowAutoUpdateSchema"); |
| } |
| |
| protected Set<SubscriptionType> internalGetSubscriptionTypesEnabled() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, |
| PolicyOperation.READ); |
| Set<SubscriptionType> subscriptionTypes = new HashSet<>(); |
| getNamespacePolicies(namespaceName).subscription_types_enabled.forEach(subType -> |
| subscriptionTypes.add(SubscriptionType.valueOf(subType.name()))); |
| return subscriptionTypes; |
| } |
| |
| protected void internalSetSubscriptionTypesEnabled(Set<SubscriptionType> subscriptionTypesEnabled) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, |
| PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| Set<SubType> subTypes = new HashSet<>(); |
| subscriptionTypesEnabled.forEach(subscriptionType -> subTypes.add(SubType.valueOf(subscriptionType.name()))); |
| mutatePolicy((policies) -> { |
| policies.subscription_types_enabled = subTypes; |
| return policies; |
| }, (policies) -> policies.subscription_types_enabled, |
| "subscriptionTypesEnabled"); |
| } |
| |
| |
| private <T> void mutatePolicy(Function<Policies, Policies> policyTransformation, |
| Function<Policies, T> getter, |
| String policyName) { |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| MutableObject exception = new MutableObject(null); |
| MutableObject policiesObj = new MutableObject(null); |
| updatePolicies(path, (policies) -> { |
| try { |
| policies = policyTransformation.apply(policies); |
| } catch (Exception e) { |
| exception.setValue(e); |
| } |
| policiesObj.setValue(policies); |
| return policies; |
| }); |
| if (exception.getValue() != null) { |
| throw (Exception) exception.getValue(); |
| } |
| log.info("[{}] Successfully updated {} configuration: namespace={}, value={}", clientAppId(), policyName, |
| namespaceName, getter.apply((Policies) policiesObj.getValue())); |
| } catch (RestException pfe) { |
| throw pfe; |
| } catch (Exception e) { |
| log.error("[{}] Failed to update {} configuration for namespace {}", |
| clientAppId(), policyName, namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPolicies offloadPolicies) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| validateOffloadPolicies(offloadPolicies); |
| |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| namespaceResources().setAsync(path, (policies) -> { |
| if (Objects.equals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), |
| OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) { |
| offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms); |
| } else { |
| policies.offload_deletion_lag_ms = offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(); |
| } |
| if (Objects.equals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), |
| OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES)) { |
| offloadPolicies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold); |
| } else { |
| policies.offload_threshold = offloadPolicies.getManagedLedgerOffloadThresholdInBytes(); |
| } |
| policies.offload_policies = offloadPolicies; |
| return policies; |
| }).thenApply(r -> { |
| log.info("[{}] Successfully updated offload configuration: namespace={}, map={}", clientAppId(), |
| namespaceName, offloadPolicies); |
| asyncResponse.resume(Response.noContent().build()); |
| return null; |
| }).exceptionally(e -> { |
| log.error("[{}] Failed to update offload configuration for namespace {}", clientAppId(), namespaceName, |
| e); |
| asyncResponse.resume(new RestException(e)); |
| return null; |
| }); |
| } catch (Exception e) { |
| log.error("[{}] Failed to update offload configuration for namespace {}", clientAppId(), namespaceName, e); |
| asyncResponse.resume(e.getCause() instanceof NotFoundException |
| ? new RestException(Status.CONFLICT, "Concurrent modification") |
| : new RestException(e)); |
| } |
| } |
| |
| protected void internalRemoveOffloadPolicies(AsyncResponse asyncResponse) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| try { |
| final String path = path(POLICIES, namespaceName.toString()); |
| namespaceResources().setAsync(path, (policies) -> { |
| policies.offload_policies = null; |
| return policies; |
| }).thenApply(r -> { |
| log.info("[{}] Successfully remove offload configuration: namespace={}", clientAppId(), namespaceName); |
| asyncResponse.resume(Response.noContent().build()); |
| return null; |
| }).exceptionally(e -> { |
| log.error("[{}] Failed to remove offload configuration for namespace {}", clientAppId(), namespaceName, |
| e); |
| asyncResponse.resume(e.getCause() instanceof NotFoundException |
| ? new RestException(Status.CONFLICT, "Concurrent modification") |
| : new RestException(e)); |
| return null; |
| }); |
| } catch (Exception e) { |
| log.error("[{}] Failed to remove offload configuration for namespace {}", clientAppId(), namespaceName, e); |
| asyncResponse.resume(new RestException(e)); |
| } |
| } |
| |
| private void validateOffloadPolicies(OffloadPolicies offloadPolicies) { |
| if (offloadPolicies == null) { |
| log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null", |
| clientAppId(), namespaceName); |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "The offloadPolicies must be specified for namespace offload."); |
| } |
| if (!offloadPolicies.driverSupported()) { |
| log.warn("[{}] Failed to update offload configuration for namespace {}: " |
| + "driver is not supported, support value: {}", |
| clientAppId(), namespaceName, OffloadPolicies.getSupportedDriverNames()); |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "The driver is not supported, support value: " + OffloadPolicies.getSupportedDriverNames()); |
| } |
| if (!offloadPolicies.bucketValid()) { |
| log.warn("[{}] Failed to update offload configuration for namespace {}: bucket must be specified", |
| clientAppId(), namespaceName); |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "The bucket must be specified for namespace offload."); |
| } |
| } |
| |
| protected OffloadPolicies internalGetOffloadPolicies() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ); |
| |
| Policies policies = getNamespacePolicies(namespaceName); |
| return policies.offload_policies; |
| } |
| |
| protected int internalGetMaxTopicsPerNamespace() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.READ); |
| return getNamespacePolicies(namespaceName).max_topics_per_namespace; |
| } |
| |
| protected void internalRemoveMaxTopicsPerNamespace() { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE); |
| internalSetMaxTopicsPerNamespace(null); |
| } |
| |
| protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) { |
| validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE); |
| validatePoliciesReadOnlyAccess(); |
| |
| if (maxTopicsPerNamespace != null && maxTopicsPerNamespace < 0) { |
| throw new RestException(Status.PRECONDITION_FAILED, |
| "maxTopicsPerNamespace must be 0 or more"); |
| } |
| internalSetPolicies("max_topics_per_namespace", maxTopicsPerNamespace); |
| } |
| |
| private void updatePolicies(String path, Function<Policies, Policies> updateFunction) { |
| try { |
| // Force to read the data s.t. the watch to the cache content is setup. |
| namespaceResources().set(path, updateFunction); |
| log.info("[{}] Successfully updated the {} on namespace {}", clientAppId(), path, namespaceName); |
| } catch (NotFoundException e) { |
| log.warn("[{}] Namespace {}: does not exist", clientAppId(), namespaceName); |
| throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); |
| } catch (BadVersionException e) { |
| log.warn("[{}] Failed to update the replication clusters on" + " namespace {} : concurrent modification", |
| clientAppId(), namespaceName); |
| |
| throw new RestException(Status.CONFLICT, "Concurrent modification"); |
| } catch (Exception e) { |
| log.error("[{}] Failed to update namespace policies {}", clientAppId(), namespaceName, e); |
| throw new RestException(e); |
| } |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class); |
| } |