blob: ce0f2211918a8f9222f895601b6ec33b3aa3d260 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClustersBase extends PulsarWebResource {
@GET
@ApiOperation(
value = "Get the list of all the Pulsar clusters.",
response = String.class,
responseContainer = "Set")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Return a list of clusters."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public Set<String> getClusters() throws Exception {
try {
// Remove "global" cluster from returned list
Set<String> clusters = clusterResources().list().stream()
.filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster)).collect(Collectors.toSet());
return clusters;
} catch (Exception e) {
log.error("[{}] Failed to get clusters list", clientAppId(), e);
throw new RestException(e);
}
}
@GET
@Path("/{cluster}")
@ApiOperation(
value = "Get the configuration for the specified cluster.",
response = ClusterData.class,
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Return the cluster data.", response = ClusterData.class),
@ApiResponse(code = 403, message = "Don't have admin permission."),
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public ClusterData getCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) {
validateSuperUserAccess();
try {
return clusterResources().get(path("clusters", cluster))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e);
if (e instanceof RestException) {
throw (RestException) e;
} else {
throw new RestException(e);
}
}
}
@PUT
@Path("/{cluster}")
@ApiOperation(
value = "Create a new cluster.",
notes = "This operation requires Pulsar superuser privileges, and the name cannot contain the '/' characters."
)
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Cluster has been created."),
@ApiResponse(code = 403, message = "You don't have admin permission to create the cluster."),
@ApiResponse(code = 409, message = "Cluster already exists."),
@ApiResponse(code = 412, message = "Cluster name is not valid."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void createCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The cluster data",
required = true,
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
value =
"{\n"
+ " 'serviceUrl': 'http://pulsar.example.com:8080',\n"
+ " 'brokerServiceUrl': 'pulsar://pulsar.example.com:6651',\n"
+ "}"
)
)
)
ClusterData clusterData
) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
try {
NamedEntity.checkName(cluster);
if (clusterResources().get(path("clusters", cluster)).isPresent()) {
log.warn("[{}] Failed to create already existing cluster {}", clientAppId(), cluster);
throw new RestException(Status.CONFLICT, "Cluster already exists");
}
clusterResources().create(path("clusters", cluster), clusterData);
log.info("[{}] Created cluster {}", clientAppId(), cluster);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to create cluster with invalid name {}", clientAppId(), cluster, e);
throw new RestException(Status.PRECONDITION_FAILED, "Cluster name is not valid");
} catch (Exception e) {
log.error("[{}] Failed to create cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
}
@POST
@Path("/{cluster}")
@ApiOperation(
value = "Update the configuration for a cluster.",
notes = "This operation requires Pulsar superuser privileges.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Cluster has been updated."),
@ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."),
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void updateCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The cluster data",
required = true,
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
value =
"{\n"
+ " 'serviceUrl': 'http://pulsar.example.com:8080',\n"
+ " 'brokerServiceUrl': 'pulsar://pulsar.example.com:6651'\n"
+ "}"
)
)
)
ClusterData clusterData
) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
try {
clusterResources().set(path("clusters", cluster), old -> {
old.update(clusterData);
return old;
});
log.info("[{}] Updated cluster {}", clientAppId(), cluster);
} catch (NotFoundException e) {
log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
} catch (Exception e) {
log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
}
@POST
@Path("/{cluster}/peers")
@ApiOperation(
value = "Update peer-cluster-list for a cluster.",
notes = "This operation requires Pulsar superuser privileges.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Cluster has been updated."),
@ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."),
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 412, message = "Peer cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void setPeerClusterNames(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The list of peer cluster names",
required = true,
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
value =
"[\n"
+ " 'cluster-a',\n"
+ " 'cluster-b'\n"
+ "]"
)
)
)
LinkedHashSet<String> peerClusterNames
) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
// validate if peer-cluster exist
if (peerClusterNames != null && !peerClusterNames.isEmpty()) {
for (String peerCluster : peerClusterNames) {
try {
if (cluster.equalsIgnoreCase(peerCluster)) {
throw new RestException(Status.PRECONDITION_FAILED,
cluster + " itself can't be part of peer-list");
}
clusterResources().get(path("clusters", peerCluster))
.orElseThrow(() -> new RestException(Status.PRECONDITION_FAILED,
"Peer cluster " + peerCluster + " does not exist"));
} catch (RestException e) {
log.warn("[{}] Peer cluster doesn't exist from {}, {}", clientAppId(), peerClusterNames,
e.getMessage());
throw e;
} catch (Exception e) {
log.warn("[{}] Failed to validate peer-cluster list {}, {}", clientAppId(), peerClusterNames,
e.getMessage());
throw new RestException(e);
}
}
}
try {
clusterResources().set(path("clusters", cluster), old -> {
old.setPeerClusterNames(peerClusterNames);
return old;
});
log.info("[{}] Successfully added peer-cluster {} for {}", clientAppId(), peerClusterNames, cluster);
} catch (NotFoundException e) {
log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
} catch (Exception e) {
log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
}
@GET
@Path("/{cluster}/peers")
@ApiOperation(
value = "Get the peer-cluster data for the specified cluster.",
response = String.class,
responseContainer = "Set",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission."),
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public Set<String> getPeerCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) {
validateSuperUserAccess();
try {
ClusterData clusterData = clusterResources().get(path("clusters", cluster))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return clusterData.getPeerClusterNames();
} catch (Exception e) {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
}
@DELETE
@Path("/{cluster}")
@ApiOperation(
value = "Delete an existing cluster.",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Cluster has been deleted."),
@ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."),
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 412, message = "Cluster is not empty."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void deleteCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
// Check that the cluster is not used by any property (eg: no namespaces provisioned there)
boolean isClusterUsed = false;
try {
for (String property : tenantResources().getChildren(path(POLICIES))) {
if (!clusterResources().exists(path(POLICIES, property, cluster))) {
continue;
}
if (!clusterResources().getChildren(path(POLICIES, property, cluster)).isEmpty()) {
// We found a property that has at least a namespace in this cluster
isClusterUsed = true;
break;
}
}
// check the namespaceIsolationPolicies associated with the cluster
String path = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
Optional<NamespaceIsolationPolicies> nsIsolationPolicies = namespaceIsolationPolicies().getPolicies(path);
// Need to delete the isolation policies if present
if (nsIsolationPolicies.isPresent()) {
if (nsIsolationPolicies.get().getPolicies().isEmpty()) {
namespaceIsolationPolicies().delete(path);
} else {
isClusterUsed = true;
}
}
} catch (Exception e) {
log.error("[{}] Failed to get cluster usage {}", clientAppId(), cluster, e);
throw new RestException(e);
}
if (isClusterUsed) {
log.warn("[{}] Failed to delete cluster {} - Cluster not empty", clientAppId(), cluster);
throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
}
try {
String clusterPath = path("clusters", cluster);
deleteFailureDomain(clusterPath);
clusterResources().delete(clusterPath);
log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
} catch (NotFoundException e) {
log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
} catch (Exception e) {
log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
}
private void deleteFailureDomain(String clusterPath) {
try {
String failureDomain = joinPath(clusterPath, ConfigurationCacheService.FAILURE_DOMAIN);
if (!clusterResources().exists(failureDomain)) {
return;
}
for (String domain : clusterResources().getChildren(failureDomain)) {
String domainPath = joinPath(failureDomain, domain);
clusterResources().delete(domainPath);
}
clusterResources().delete(failureDomain);
} catch (Exception e) {
log.warn("Failed to delete failure-domain under cluster {}", clusterPath);
throw new RestException(e);
}
}
@GET
@Path("/{cluster}/namespaceIsolationPolicies")
@ApiOperation(
value = "Get the namespace isolation policies assigned to the cluster.",
response = NamespaceIsolationData.class,
responseContainer = "Map",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission."),
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) throws Exception {
validateSuperUserAccess();
if (!clusterResources().exists(path("clusters", cluster))) {
throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " does not exist.");
}
try {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getPolicies(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to Namespace isolation data map
return nsIsolationPolicies.getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
throw new RestException(e);
}
}
@GET
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(
value = "Get the single namespace isolation policy assigned to the cluster.",
response = NamespaceIsolationData.class,
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission."),
@ApiResponse(code = 404, message = "Policy doesn't exist."),
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public NamespaceIsolationData getNamespaceIsolationPolicy(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The name of the namespace isolation policy",
required = true
)
@PathParam("policyName") String policyName
) throws Exception {
validateSuperUserAccess();
validateClusterExists(cluster);
try {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getPolicies(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to Namespace isolation data map
if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}",
clientAppId(), policyName, cluster);
throw new RestException(Status.NOT_FOUND,
"Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
}
return nsIsolationPolicies.getPolicies().get(policyName);
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster, e);
throw new RestException(e);
}
}
@GET
@Path("/{cluster}/namespaceIsolationPolicies/brokers")
@ApiOperation(
value = "Get list of brokers with namespace-isolation policies attached to them.",
response = BrokerNamespaceIsolationData.class,
responseContainer = "set",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission."),
@ApiResponse(code = 404, message = "Namespace-isolation policies not found."),
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster) {
validateSuperUserAccess();
validateClusterExists(cluster);
Set<String> availableBrokers;
final String nsIsolationPoliciesPath = AdminResource.path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
Map<String, NamespaceIsolationData> nsPolicies;
try {
availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers();
} catch (Exception e) {
log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
try {
Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPolicies()
.getPolicies(nsIsolationPoliciesPath);
if (!nsPoliciesResult.isPresent()) {
throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster);
}
nsPolicies = nsPoliciesResult.get().getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e);
throw new RestException(e);
}
return availableBrokers.stream().map(broker -> {
BrokerNamespaceIsolationData brokerIsolationData = new BrokerNamespaceIsolationData();
brokerIsolationData.brokerName = broker;
if (nsPolicies != null) {
nsPolicies.forEach((name, policyData) -> {
NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
if (brokerIsolationData.namespaceRegex == null) {
brokerIsolationData.namespaceRegex = Lists.newArrayList();
}
brokerIsolationData.namespaceRegex.addAll(policyData.namespaces);
if (nsPolicyImpl.isPrimaryBroker(broker)) {
brokerIsolationData.isPrimary = true;
}
}
});
}
return brokerIsolationData;
}).collect(Collectors.toList());
}
@GET
@Path("/{cluster}/namespaceIsolationPolicies/brokers/{broker}")
@ApiOperation(
value = "Get a broker with namespace-isolation policies attached to it.",
response = BrokerNamespaceIsolationData.class,
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission."),
@ApiResponse(code = 404, message = "Namespace-isolation policies/ Broker not found."),
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The broker name (<broker-hostname>:<web-service-port>)",
required = true,
example = "broker1:8080"
)
@PathParam("broker") String broker) {
validateSuperUserAccess();
validateClusterExists(cluster);
final String nsIsolationPoliciesPath = AdminResource.path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
Map<String, NamespaceIsolationData> nsPolicies;
try {
Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPolicies()
.getPolicies(nsIsolationPoliciesPath);
if (!nsPoliciesResult.isPresent()) {
throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster);
}
nsPolicies = nsPoliciesResult.get().getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e);
throw new RestException(e);
}
BrokerNamespaceIsolationData brokerIsolationData = new BrokerNamespaceIsolationData();
brokerIsolationData.brokerName = broker;
if (nsPolicies != null) {
nsPolicies.forEach((name, policyData) -> {
NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
boolean isPrimary = nsPolicyImpl.isPrimaryBroker(broker);
if (isPrimary || nsPolicyImpl.isSecondaryBroker(broker)) {
if (brokerIsolationData.namespaceRegex == null) {
brokerIsolationData.namespaceRegex = Lists.newArrayList();
}
brokerIsolationData.namespaceRegex.addAll(policyData.namespaces);
brokerIsolationData.isPrimary = isPrimary;
brokerIsolationData.policyName = name;
}
});
}
return brokerIsolationData;
}
@POST
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(
value = "Set namespace isolation policy.",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Namespace isolation policy data is invalid."),
@ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."),
@ApiResponse(code = 404, message = "Namespace isolation policy doesn't exist."),
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void setNamespaceIsolationPolicy(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The namespace isolation policy name",
required = true
)
@PathParam("policyName") String policyName,
@ApiParam(
value = "The namespace isolation policy data",
required = true
)
NamespaceIsolationData policyData
) {
validateSuperUserAccess();
validateClusterExists(cluster);
validatePoliciesReadOnlyAccess();
String jsonInput = null;
try {
// validate the policy data before creating the node
policyData.validate();
jsonInput = ObjectMapperFactory.create().writeValueAsString(policyData);
String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
try {
namespaceIsolationPolicies().setWithCreate(nsIsolationPolicyPath,
(p) -> Collections.emptyMap());
return new NamespaceIsolationPolicies();
} catch (Exception e) {
throw new RestException(e);
}
});
nsIsolationPolicies.setPolicy(policyName, policyData);
namespaceIsolationPolicies().set(nsIsolationPolicyPath, old -> nsIsolationPolicies.getPolicies());
// whether or not make the isolation update on time.
if (pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) {
filterAndUnloadMatchedNameSpaces(asyncResponse, policyData);
} else {
asyncResponse.resume(Response.noContent().build());
return;
}
} catch (IllegalArgumentException iae) {
log.info("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",
clientAppId(), cluster, policyName, iae);
asyncResponse.resume(new RestException(Status.BAD_REQUEST,
"Invalid format of input policy data. policy: " + policyName + "; data: " + jsonInput));
} catch (NotFoundException nne) {
log.warn("[{}] Failed to update clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
cluster);
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
policyName, e);
asyncResponse.resume(new RestException(e));
}
}
// get matched namespaces; call unload for each namespaces;
private void filterAndUnloadMatchedNameSpaces(AsyncResponse asyncResponse,
NamespaceIsolationData policyData) throws Exception {
Namespaces namespaces = pulsar().getAdminClient().namespaces();
List<String> nssToUnload = Lists.newArrayList();
pulsar().getAdminClient().tenants().getTenantsAsync()
.whenComplete((tenants, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get tenants when setNamespaceIsolationPolicy.", clientAppId(), ex);
return;
}
AtomicInteger tenantsNumber = new AtomicInteger(tenants.size());
// get all tenants now, for each tenants, get its namespaces
tenants.forEach(tenant -> namespaces.getNamespacesAsync(tenant)
.whenComplete((nss, e) -> {
int leftTenantsToHandle = tenantsNumber.decrementAndGet();
if (ex != null) {
log.error("[{}] Failed to get namespaces for tenant {} when setNamespaceIsolationPolicy.",
clientAppId(), tenant, ex);
if (leftTenantsToHandle == 0) {
unloadMatchedNamespacesList(asyncResponse, nssToUnload, namespaces);
}
return;
}
AtomicInteger nssNumber = new AtomicInteger(nss.size());
// get all namespaces for this tenant now.
nss.forEach(namespaceName -> {
int leftNssToHandle = nssNumber.decrementAndGet();
// if namespace match any policy regex, add it to ns list to be unload.
if (policyData.namespaces.stream()
.anyMatch(nsnameRegex -> namespaceName.matches(nsnameRegex))) {
nssToUnload.add(namespaceName);
}
// all the tenants & namespaces get filtered.
if (leftNssToHandle == 0 && leftTenantsToHandle == 0) {
unloadMatchedNamespacesList(asyncResponse, nssToUnload, namespaces);
}
});
}));
});
}
private void unloadMatchedNamespacesList(AsyncResponse asyncResponse,
List<String> nssToUnload,
Namespaces namespaces) {
if (nssToUnload.size() == 0) {
asyncResponse.resume(Response.noContent().build());
return;
}
List<CompletableFuture<Void>> futures = nssToUnload.stream()
.map(namespaceName -> namespaces.unloadAsync(namespaceName))
.collect(Collectors.toList());
FutureUtil.waitForAll(futures).whenComplete((result, exception) -> {
if (exception != null) {
log.error("[{}] Failed to unload namespace while setNamespaceIsolationPolicy.",
clientAppId(), exception);
asyncResponse.resume(new RestException(exception));
return;
}
try {
// write load info to load manager to make the load happens fast
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
} catch (Exception e) {
log.warn("[{}] Failed to writeLoadReportOnZookeeper.", clientAppId(), e);
}
asyncResponse.resume(Response.noContent().build());
return;
});
}
@DELETE
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(
value = "Delete namespace isolation policy.",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission or policies are read only."),
@ApiResponse(code = 404, message = "Namespace isolation policy doesn't exist."),
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void deleteNamespaceIsolationPolicy(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The namespace isolation policy name",
required = true
)
@PathParam("policyName") String policyName
) throws Exception {
validateSuperUserAccess();
validateClusterExists(cluster);
validatePoliciesReadOnlyAccess();
try {
String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
try {
namespaceIsolationPolicies().setWithCreate(nsIsolationPolicyPath,
(p) -> Collections.emptyMap());
return new NamespaceIsolationPolicies();
} catch (Exception e) {
throw new RestException(e);
}
});
nsIsolationPolicies.deletePolicy(policyName);
namespaceIsolationPolicies().set(nsIsolationPolicyPath, old -> nsIsolationPolicies.getPolicies());
} catch (NotFoundException nne) {
log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
cluster);
throw new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist");
} catch (Exception e) {
log.error("[{}] Failed to update brokers/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
policyName, e);
throw new RestException(e);
}
}
@POST
@Path("/{cluster}/failureDomains/{domainName}")
@ApiOperation(
value = "Set the failure domain of the cluster.",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission."),
@ApiResponse(code = 404, message = "Failure domain doesn't exist."),
@ApiResponse(code = 409, message = "Broker already exists in another domain."),
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void setFailureDomain(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The failure domain name",
required = true
)
@PathParam("domainName") String domainName,
@ApiParam(
value = "The configuration data of a failure domain",
required = true
)
FailureDomain domain
) throws Exception {
validateSuperUserAccess();
validateClusterExists(cluster);
validateBrokerExistsInOtherDomain(cluster, domainName, domain);
try {
String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
FailureDomainResources failureDomainListCache = clusterResources().getFailureDomainResources();
failureDomainListCache.setWithCreate(domainPath, old -> domain);
} catch (NotFoundException nne) {
log.warn("[{}] Failed to update domain {}. clusters {} Does not exist", clientAppId(), cluster,
domainName);
throw new RestException(Status.NOT_FOUND,
"Domain " + domainName + " for cluster " + cluster + " does not exist");
} catch (Exception e) {
log.error("[{}] Failed to update clusters/{}/domainName/{}", clientAppId(), cluster, domainName, e);
throw new RestException(e);
}
}
@GET
@Path("/{cluster}/failureDomains")
@ApiOperation(
value = "Get the cluster failure domains.",
response = FailureDomain.class,
responseContainer = "Map",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 500, message = "Internal server error")
})
public Map<String, FailureDomain> getFailureDomains(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) throws Exception {
validateSuperUserAccess();
Map<String, FailureDomain> domains = Maps.newHashMap();
try {
final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
FailureDomainResources failureDomainListCache = clusterResources().getFailureDomainResources();
for (String domainName : failureDomainListCache.getChildren(failureDomainRootPath)) {
try {
Optional<FailureDomain> domain = failureDomainListCache
.get(joinPath(failureDomainRootPath, domainName));
domain.ifPresent(failureDomain -> domains.put(domainName, failureDomain));
} catch (Exception e) {
log.warn("Failed to get domain {}", domainName, e);
}
}
} catch (NotFoundException e) {
log.warn("[{}] Failure-domain is not configured for cluster {}", clientAppId(), cluster, e);
return Collections.emptyMap();
} catch (Exception e) {
log.error("[{}] Failed to get failure-domains for cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
return domains;
}
@GET
@Path("/{cluster}/failureDomains/{domainName}")
@ApiOperation(
value = "Get a domain in a cluster",
response = FailureDomain.class,
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "FailureDomain doesn't exist"),
@ApiResponse(code = 412, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
public FailureDomain getDomain(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The failure domain name",
required = true
)
@PathParam("domainName") String domainName
) throws Exception {
validateSuperUserAccess();
validateClusterExists(cluster);
try {
final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
return clusterResources().getFailureDomainResources().get(joinPath(failureDomainRootPath, domainName))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Domain " + domainName + " for cluster " + cluster + " does not exist"));
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get domain {} for cluster {}", clientAppId(), domainName, cluster, e);
throw new RestException(e);
}
}
@DELETE
@Path("/{cluster}/failureDomains/{domainName}")
@ApiOperation(
value = "Delete the failure domain of the cluster",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission or policy is read only"),
@ApiResponse(code = 404, message = "FailureDomain doesn't exist"),
@ApiResponse(code = 412, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void deleteFailureDomain(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The failure domain name",
required = true
)
@PathParam("domainName") String domainName
) throws Exception {
validateSuperUserAccess();
validateClusterExists(cluster);
try {
final String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT,
domainName);
clusterResources().getFailureDomainResources().delete(domainPath);
} catch (NotFoundException nne) {
log.warn("[{}] Domain {} does not exist in {}", clientAppId(), domainName, cluster);
throw new RestException(Status.NOT_FOUND,
"Domain-name " + domainName + " or cluster " + cluster + " does not exist");
} catch (Exception e) {
log.error("[{}] Failed to delete domain {} in cluster {}", clientAppId(), domainName, cluster, e);
throw new RestException(e);
}
}
private void validateBrokerExistsInOtherDomain(final String cluster, final String inputDomainName,
final FailureDomain inputDomain) {
if (inputDomain != null && inputDomain.brokers != null) {
try {
final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
for (String domainName : clusterResources().getFailureDomainResources()
.getChildren(failureDomainRootPath)) {
if (inputDomainName.equals(domainName)) {
continue;
}
try {
Optional<FailureDomain> domain =
clusterResources().getFailureDomainResources()
.get(joinPath(failureDomainRootPath, domainName));
if (domain.isPresent() && domain.get().brokers != null) {
List<String> duplicateBrokers = domain.get().brokers.stream().parallel()
.filter(inputDomain.brokers::contains).collect(Collectors.toList());
if (!duplicateBrokers.isEmpty()) {
throw new RestException(Status.CONFLICT,
duplicateBrokers + " already exists in " + domainName);
}
}
} catch (Exception e) {
if (e instanceof RestException) {
throw e;
}
log.warn("Failed to get domain {}", domainName, e);
}
}
} catch (NotFoundException e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Domain is not configured for cluster", clientAppId(), e);
}
} catch (Exception e) {
log.error("[{}] Failed to get domains for cluster {}", clientAppId(), e);
throw new RestException(e);
}
}
}
private static final Logger log = LoggerFactory.getLogger(ClustersBase.class);
}