blob: 5f73bc949defc50dd270e31adfca2e9d9ccc1504 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService.State;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Broker admin base.
*/
public class BrokersBase extends AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";
@GET
@Path("/{cluster}")
@ApiOperation(
value = "Get the list of active brokers (web service addresses) in the cluster."
+ "If authorization is not enabled, any cluster name is valid.",
response = String.class,
responseContainer = "Set")
@ApiResponses(
value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve this cluster"),
@ApiResponse(code = 401, message = "Authentication required"),
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 404, message = "Cluster does not exist: cluster={clustername}") })
public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse,
@PathParam("cluster") String cluster) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validateClusterOwnershipAsync(cluster))
.thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync())
.thenAccept(activeBrokers -> {
LOG.info("[{}] Successfully to get active brokers, cluster={}", clientAppId(), cluster);
asyncResponse.resume(activeBrokers);
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
LOG.error("[{}] Fail to get active brokers, cluster={}", clientAppId(), cluster, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
@GET
@Path("/leaderBroker")
@ApiOperation(
value = "Get the information of the leader broker.",
response = BrokerInfo.class)
@ApiResponses(
value = {
@ApiResponse(code = 401, message = "Authentication required"),
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 404, message = "Leader broker not found") })
public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) {
validateSuperUserAccessAsync().thenAccept(__ -> {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
BrokerInfo brokerInfo = BrokerInfo.builder().serviceUrl(leaderBroker.getServiceUrl()).build();
LOG.info("[{}] Successfully to get the information of the leader broker.", clientAppId());
asyncResponse.resume(brokerInfo);
})
.exceptionally(ex -> {
LOG.error("[{}] Failed to get the information of the leader broker.", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
@GET
@Path("/{clusterName}/{broker-webserviceurl}/ownedNamespaces")
@ApiOperation(value = "Get the list of namespaces served by the specific broker",
response = NamespaceOwnershipStatus.class, responseContainer = "Map")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the cluster"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist") })
public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(@PathParam("clusterName") String cluster,
@PathParam("broker-webserviceurl") String broker) throws Exception {
validateSuperUserAccess();
validateClusterOwnership(cluster);
validateBrokerName(broker);
try {
// now we validated that this is the broker specified in the request
return pulsar().getNamespaceService().getOwnedNameSpacesStatus();
} catch (Exception e) {
LOG.error("[{}] Failed to get the namespace ownership status. cluster={}, broker={}", clientAppId(),
cluster, broker);
throw new RestException(e);
}
}
@POST
@Path("/configuration/{configName}/{configValue}")
@ApiOperation(value =
"Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Service configuration updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 412, message = "Invalid dynamic-config value"),
@ApiResponse(code = 500, message = "Internal server error") })
public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse,
@PathParam("configName") String configName,
@PathParam("configValue") String configValue) {
validateSuperUserAccessAsync()
.thenCompose(__ -> persistDynamicConfigurationAsync(configName, configValue))
.thenAccept(__ -> {
LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
LOG.error("[{}] Failed to update configuration {}/{}", clientAppId(), configName, configValue, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
@DELETE
@Path("/configuration/{configName}")
@ApiOperation(value =
"Delete dynamic ServiceConfiguration into metadata only."
+ " This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 412, message = "Invalid dynamic-config value"),
@ApiResponse(code = 500, message = "Internal server error") })
public void deleteDynamicConfiguration(
@Suspended AsyncResponse asyncResponse,
@PathParam("configName") String configName) {
validateSuperUserAccessAsync()
.thenCompose(__ -> internalDeleteDynamicConfigurationOnMetadataAsync(configName))
.thenAccept(__ -> {
LOG.info("[{}] Successfully to delete dynamic configuration {}", clientAppId(), configName);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
LOG.error("[{}] Failed to delete dynamic configuration {}", clientAppId(), configName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
@GET
@Path("/configuration/values")
@ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "You don't have admin permission to view configuration"),
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 500, message = "Internal server error")})
public Map<String, String> getAllDynamicConfigurations() throws Exception {
validateSuperUserAccess();
try {
return dynamicConfigurationResources().getDynamicConfiguration().orElseGet(Collections::emptyMap);
} catch (RestException e) {
LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
throw e;
} catch (Exception e) {
LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
}
@GET
@Path("/configuration")
@ApiOperation(value = "Get all updatable dynamic configurations's name")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "You don't have admin permission to get configuration")})
public List<String> getDynamicConfigurationName() {
validateSuperUserAccess();
return BrokerService.getDynamicConfiguration();
}
@GET
@Path("/configuration/runtime")
@ApiOperation(value = "Get all runtime configurations. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public Map<String, String> getRuntimeConfiguration() {
validateSuperUserAccess();
return pulsar().getBrokerService().getRuntimeConfiguration();
}
/**
* if {@link ServiceConfiguration}-field is allowed to be modified dynamically, update configuration-map into zk, so
* all other brokers get the watch and can see the change and take appropriate action on the change.
*
* @param configName
* : configuration key
* @param configValue
* : configuration value
*/
private synchronized CompletableFuture<Void> persistDynamicConfigurationAsync(
String configName, String configValue) {
if (!BrokerService.validateDynamicConfiguration(configName, configValue)) {
return FutureUtil
.failedFuture(new RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value"));
}
if (BrokerService.isDynamicConfiguration(configName)) {
return dynamicConfigurationResources().setDynamicConfigurationWithCreateAsync(old -> {
Map<String, String> configurationMap = old.orElseGet(Maps::newHashMap);
configurationMap.put(configName, configValue);
return configurationMap;
});
} else {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Can't update non-dynamic configuration"));
}
}
@GET
@Path("/internal-configuration")
@ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public InternalConfigurationData getInternalConfigurationData() {
validateSuperUserAccess();
return pulsar().getInternalConfigurationData();
}
@GET
@Path("/backlog-quota-check")
@ApiOperation(value = "An REST endpoint to trigger backlogQuotaCheck")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 500, message = "Internal server error")})
public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccess();
pulsar().getBrokerService().getBacklogQuotaChecker().execute(safeRun(()->{
try {
pulsar().getBrokerService().monitorBacklogQuota();
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
LOG.error("trigger backlogQuotaCheck fail", e);
asyncResponse.resume(new RestException(e));
}
}));
}
@GET
@Path("/ready")
@ApiOperation(value = "Check if the broker is fully initialized")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Broker is ready"),
@ApiResponse(code = 500, message = "Broker is not ready") })
public void isReady(@Suspended AsyncResponse asyncResponse) {
if (pulsar().getState() == State.Started) {
asyncResponse.resume(Response.ok("ok").build());
} else {
asyncResponse.resume(Response.serverError().build());
}
}
@GET
@Path("/health")
@ApiOperation(value = "Run a healthCheck against the broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
@ApiParam(value = "Topic Version")
public void healthCheck(@Suspended AsyncResponse asyncResponse,
@QueryParam("topicVersion") TopicVersion topicVersion) {
validateSuperUserAccessAsync()
.thenCompose(__ -> internalRunHealthCheck(topicVersion))
.thenAccept(__ -> {
LOG.info("[{}] Successfully run health check.", clientAppId());
asyncResponse.resume("ok");
}).exceptionally(ex -> {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
? NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), pulsar().getConfiguration())
: NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
final String messageStr = UUID.randomUUID().toString();
final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader if present.
return pulsar().getBrokerService().getTopic(topicName, true)
.thenCompose(topicOptional -> {
if (!topicOptional.isPresent()) {
LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
clientAppId(), topicName);
throw new RestException(Status.NOT_FOUND,
String.format("Topic [%s] not found after create.", topicName));
}
PulsarClient client;
try {
client = pulsar().getClient();
} catch (PulsarServerException e) {
LOG.error("[{}] Fail to run health check while get client.", clientAppId());
throw new RestException(e);
}
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
client.newProducer(Schema.STRING).topic(topicName).createAsync()
.thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName)
.subscriptionName(subscriptionName)
.startMessageId(MessageId.latest)
.createAsync().exceptionally(createException -> {
producer.closeAsync().exceptionally(ex -> {
LOG.error("[{}] Close producer fail while heath check.", clientAppId());
return null;
});
throw FutureUtil.wrapToCompletionException(createException);
}).thenCompose(reader -> producer.sendAsync(messageStr)
.thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
.whenComplete((__, ex) -> {
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
.whenComplete((unused, innerEx) -> {
if (ex != null) {
resultFuture.completeExceptionally(ex);
} else {
resultFuture.complete(null);
}
});
}
))
).exceptionally(ex -> {
resultFuture.completeExceptionally(ex);
return null;
});
return resultFuture;
});
}
/**
* Close producer and reader and then to re-check if this operation is success.
*
* Re-check
* - Producer: If close fails we will print error log to notify user.
* - Consumer: If close fails we will force delete subscription.
*
* @param producer Producer
* @param reader Reader
* @param topic Topic
* @param subscriptionName Subscription name
*/
private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName) {
// no matter exception or success, we still need to
// close producer/reader
CompletableFuture<Void> producerFuture = producer.closeAsync();
CompletableFuture<Void> readerFuture = reader.closeAsync();
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
futures.add(producerFuture);
futures.add(readerFuture);
return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
.exceptionally(closeException -> {
if (readerFuture.isCompletedExceptionally()) {
LOG.error("[{}] Close reader fail while heath check.", clientAppId());
Subscription subscription =
topic.getSubscription(subscriptionName);
// re-check subscription after reader close
if (subscription != null) {
LOG.warn("[{}] Force delete subscription {} "
+ "when it still exists after the"
+ " reader is closed.",
clientAppId(), subscription);
subscription.deleteForcefully()
.exceptionally(ex -> {
LOG.error("[{}] Force delete subscription fail"
+ " while health check",
clientAppId(), ex);
return null;
});
}
} else {
// producer future fail.
LOG.error("[{}] Close producer fail while heath check.", clientAppId());
}
return null;
});
}
private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
return reader.readNextAsync()
.thenCompose(msg -> {
if (!Objects.equals(content, msg.getValue())) {
return healthCheckRecursiveReadNext(reader, content);
}
return CompletableFuture.completedFuture(null);
});
}
private CompletableFuture<Void> internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
if (!BrokerService.isDynamicConfiguration(configName)) {
throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
} else {
return dynamicConfigurationResources().setDynamicConfigurationAsync(old -> {
if (old != null) {
old.remove(configName);
}
return old;
});
}
}
@GET
@Path("/version")
@ApiOperation(value = "Get version of current broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 500, message = "Internal server error")})
public String version() throws Exception {
return PulsarVersion.getVersion();
}
@POST
@Path("/shutdown")
@ApiOperation(value =
"Shutdown broker gracefully.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Execute shutdown command successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 500, message = "Internal server error")})
public void shutDownBrokerGracefully(
@ApiParam(name = "maxConcurrentUnloadPerSec",
value = "if the value absent(value=0) means no concurrent limitation.")
@QueryParam("maxConcurrentUnloadPerSec") int maxConcurrentUnloadPerSec,
@QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean forcedTerminateTopic
) {
validateSuperUserAccess();
doShutDownBrokerGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
}
private void doShutDownBrokerGracefully(int maxConcurrentUnloadPerSec,
boolean forcedTerminateTopic) {
pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
pulsar().closeAsync();
}
}