blob: 1cbcfaa487779c81d650601dfcb9dd54236b0028 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Getter
public class NamespaceResources extends BaseResources<Policies> {
private static final Logger log = LoggerFactory.getLogger(NamespaceResources.class);
private final IsolationPolicyResources isolationPolicies;
private final PartitionedTopicResources partitionedTopicResources;
private final MetadataStore configurationStore;
private final MetadataCache<LocalPolicies> localPoliciesCache;
private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
private static final String NAMESPACE_BASE_PATH = "/namespace";
private static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";
public NamespaceResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec);
partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
if (localStore != null) {
localPoliciesCache = localStore.getMetadataCache(LocalPolicies.class);
} else {
localPoliciesCache = null;
}
}
public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant));
}
public boolean getPoliciesReadOnly() throws MetadataStoreException {
return super.exists(POLICIES_READONLY_FLAG_PATH);
}
public void createPolicies(NamespaceName ns, Policies policies) throws MetadataStoreException{
create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}
public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException {
String path = joinPath(BASE_POLICIES_PATH, ns.toString());
return super.exists(path) && super.getChildren(path).isEmpty();
}
public CompletableFuture<Boolean> namespaceExistsAsync(NamespaceName ns) {
String path = joinPath(BASE_POLICIES_PATH, ns.toString());
return getCache().exists(path)
.thenCompose(exists -> {
if (!exists) {
return CompletableFuture.completedFuture(false);
} else {
return getChildrenAsync(path).thenApply(children -> children.isEmpty());
}
});
}
public void deletePolicies(NamespaceName ns) throws MetadataStoreException{
delete(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
public CompletableFuture<Void> deletePoliciesAsync(NamespaceName ns){
return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
public Optional<Policies> getPolicies(NamespaceName ns) throws MetadataStoreException{
return get(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
public CompletableFuture<Optional<Policies>> getPoliciesAsync(NamespaceName ns) {
return getCache().get(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
public void setPolicies(NamespaceName ns, Function<Policies, Policies> function) throws MetadataStoreException {
set(joinPath(BASE_POLICIES_PATH, ns.toString()), function);
}
public CompletableFuture<Void> setPoliciesAsync(NamespaceName ns, Function<Policies, Policies> function) {
return setAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), function);
}
public static boolean pathIsFromNamespace(String path) {
return path.startsWith(BASE_POLICIES_PATH + "/")
&& path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");
}
// clear resource of `/namespace/{namespaceName}` for zk-node
public CompletableFuture<Void> deleteNamespaceAsync(NamespaceName ns) {
final String namespacePath = joinPath(NAMESPACE_BASE_PATH, ns.toString());
return deleteIfExistsAsync(namespacePath);
}
// clear resource of `/namespace/{tenant}` for zk-node
public CompletableFuture<Void> deleteTenantAsync(String tenant) {
final String tenantPath = joinPath(NAMESPACE_BASE_PATH, tenant);
return deleteIfExistsAsync(tenantPath);
}
public static NamespaceName namespaceFromPath(String path) {
return NamespaceName.get(path.substring(BASE_POLICIES_PATH.length() + 1));
}
public static class IsolationPolicyResources extends BaseResources<Map<String, NamespaceIsolationDataImpl>> {
private static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies";
public IsolationPolicyResources(MetadataStore store, int operationTimeoutSec) {
super(store, new TypeReference<Map<String, NamespaceIsolationDataImpl>>() {
}, operationTimeoutSec);
}
public Optional<NamespaceIsolationPolicies> getIsolationDataPolicies(String cluster)
throws MetadataStoreException {
Optional<Map<String, NamespaceIsolationDataImpl>> data =
super.get(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES));
return data.isPresent() ? Optional.of(new NamespaceIsolationPolicies(data.get())) : Optional.empty();
}
public void deleteIsolationData(String cluster) throws MetadataStoreException {
delete(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES));
}
public void createIsolationData(String cluster, Map<String, NamespaceIsolationDataImpl> id)
throws MetadataStoreException {
create(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES), id);
}
public void setIsolationData(String cluster,
Function<Map<String, NamespaceIsolationDataImpl>, Map<String,
NamespaceIsolationDataImpl>> modifyFunction)
throws MetadataStoreException {
set(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES), modifyFunction);
}
public void setIsolationDataWithCreate(String cluster,
Function<Optional<Map<String, NamespaceIsolationDataImpl>>, Map<String,
NamespaceIsolationDataImpl>> createFunction)
throws MetadataStoreException {
setWithCreate(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES), createFunction);
}
}
public static class PartitionedTopicResources extends BaseResources<PartitionedTopicMetadata> {
private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics";
public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec);
}
public CompletableFuture<Void> updatePartitionedTopicAsync(TopicName tn, Function<PartitionedTopicMetadata,
PartitionedTopicMetadata> f) {
return setAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()), f);
}
public void createPartitionedTopic(TopicName tn, PartitionedTopicMetadata tm) throws MetadataStoreException {
create(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()), tm);
}
public CompletableFuture<Void> createPartitionedTopicAsync(TopicName tn, PartitionedTopicMetadata tm) {
return createAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()), tm);
}
public CompletableFuture<List<String>> listPartitionedTopicsAsync(NamespaceName ns, TopicDomain domain) {
return getChildrenAsync(joinPath(PARTITIONED_TOPIC_PATH, ns.toString(), domain.value()))
.thenApply(list ->
list.stream().map(x -> TopicName.get(domain.value(), ns, Codec.decode(x)).toString())
.collect(Collectors.toList())
);
}
public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn) {
return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
}
public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreException {
return exists(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
}
public CompletableFuture<Boolean> partitionedTopicExistsAsync(TopicName tn) {
return existsAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
}
public CompletableFuture<Void> deletePartitionedTopicAsync(TopicName tn) {
return deleteAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
}
public CompletableFuture<Void> clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
final String globalPartitionedPath = joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
return getStore().deleteRecursive(globalPartitionedPath);
}
public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String tenant) {
final String partitionedTopicPath = joinPath(PARTITIONED_TOPIC_PATH, tenant);
return deleteIfExistsAsync(partitionedTopicPath);
}
}
// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
return getStore().deleteRecursive(namespaceBundlePath);
}
// clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
return getStore().deleteRecursive(tenantBundlePath);
}
}