blob: 5a41037695ea5b2690eb7d89c8c15e19a79f471e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;
import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarService.State;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Broker admin base.
*/
public class BrokersBase extends PulsarWebResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
@GET
@Path("/{cluster}")
@ApiOperation(
value = "Get the list of active brokers (web service addresses) in the cluster."
+ "If authorization is not enabled, any cluster name is valid.",
response = String.class,
responseContainer = "Set")
@ApiResponses(
value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve this cluster"),
@ApiResponse(code = 401, message = "Authentication required"),
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 404, message = "Cluster does not exist: cluster={clustername}") })
public Set<String> getActiveBrokers(@PathParam("cluster") String cluster) throws Exception {
validateSuperUserAccess();
validateClusterOwnership(cluster);
try {
// Add Native brokers
return new HashSet<>(dynamicConfigurationResources().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT));
} catch (Exception e) {
LOG.error("[{}] Failed to get active broker list: cluster={}", clientAppId(), cluster, e);
throw new RestException(e);
}
}
@GET
@Path("/leaderBroker")
@ApiOperation(
value = "Get the information of the leader broker.",
response = BrokerInfo.class)
@ApiResponses(
value = {
@ApiResponse(code = 401, message = "Authentication required"),
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 404, message = "Leader broker not found") })
public BrokerInfo getLeaderBroker() throws Exception {
validateSuperUserAccess();
try {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
BrokerInfo brokerInfo = new BrokerInfo();
brokerInfo.setServiceUrl(leaderBroker.getServiceUrl());
return brokerInfo;
} catch (Exception e) {
LOG.error("[{}] Failed to get the information of the leader broker.", clientAppId(), e);
throw new RestException(e);
}
}
@GET
@Path("/{clusterName}/{broker-webserviceurl}/ownedNamespaces")
@ApiOperation(value = "Get the list of namespaces served by the specific broker",
response = NamespaceOwnershipStatus.class, responseContainer = "Map")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the cluster"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist") })
public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(@PathParam("clusterName") String cluster,
@PathParam("broker-webserviceurl") String broker) throws Exception {
validateSuperUserAccess();
validateClusterOwnership(cluster);
validateBrokerName(broker);
try {
// now we validated that this is the broker specified in the request
return pulsar().getNamespaceService().getOwnedNameSpacesStatus();
} catch (Exception e) {
LOG.error("[{}] Failed to get the namespace ownership status. cluster={}, broker={}", clientAppId(),
cluster, broker);
throw new RestException(e);
}
}
@POST
@Path("/configuration/{configName}/{configValue}")
@ApiOperation(value =
"Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Service configuration updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 412, message = "Invalid dynamic-config value"),
@ApiResponse(code = 500, message = "Internal server error") })
public void updateDynamicConfiguration(@PathParam("configName") String configName,
@PathParam("configValue") String configValue) throws Exception {
validateSuperUserAccess();
persistDynamicConfiguration(configName, configValue);
}
@DELETE
@Path("/configuration/{configName}")
@ApiOperation(value =
"Delete dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 412, message = "Invalid dynamic-config value"),
@ApiResponse(code = 500, message = "Internal server error") })
public void deleteDynamicConfiguration(@PathParam("configName") String configName) throws Exception {
validateSuperUserAccess();
deleteDynamicConfigurationOnZk(configName);
}
@GET
@Path("/configuration/values")
@ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "You don't have admin permission to view configuration"),
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 500, message = "Internal server error")})
public Map<String, String> getAllDynamicConfigurations() throws Exception {
validateSuperUserAccess();
try {
return dynamicConfigurationResources().get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk"));
} catch (RestException e) {
LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
throw e;
} catch (Exception e) {
LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
}
@GET
@Path("/configuration")
@ApiOperation(value = "Get all updatable dynamic configurations's name")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "You don't have admin permission to get configuration")})
public List<String> getDynamicConfigurationName() {
validateSuperUserAccess();
return BrokerService.getDynamicConfiguration();
}
@GET
@Path("/configuration/runtime")
@ApiOperation(value = "Get all runtime configurations. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public Map<String, String> getRuntimeConfiguration() {
validateSuperUserAccess();
return pulsar().getBrokerService().getRuntimeConfiguration();
}
/**
* if {@link ServiceConfiguration}-field is allowed to be modified dynamically, update configuration-map into zk, so
* all other brokers get the watch and can see the change and take appropriate action on the change.
*
* @param configName
* : configuration key
* @param configValue
* : configuration value
*/
private synchronized void persistDynamicConfiguration(String configName, String configValue) {
try {
if (!BrokerService.validateDynamicConfiguration(configName, configValue)) {
throw new RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value");
}
if (BrokerService.isDynamicConfiguration(configName)) {
dynamicConfigurationResources().setWithCreate(BROKER_SERVICE_CONFIGURATION_PATH, (old) -> {
Map<String, String> configurationMap = old.isPresent() ? old.get() : Maps.newHashMap();
configurationMap.put(configName, configValue);
return configurationMap;
});
LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", clientAppId(), configName,
configValue);
}
throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
}
} catch (RestException re) {
throw re;
} catch (Exception ie) {
LOG.error("[{}] Failed to update configuration {}/{}, {}", clientAppId(), configName, configValue,
ie.getMessage(), ie);
throw new RestException(ie);
}
}
@GET
@Path("/internal-configuration")
@ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public InternalConfigurationData getInternalConfigurationData() {
validateSuperUserAccess();
return pulsar().getInternalConfigurationData();
}
@GET
@Path("/backlog-quota-check")
@ApiOperation(value = "An REST endpoint to trigger backlogQuotaCheck")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 500, message = "Internal server error")})
public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccess();
pulsar().getBrokerService().executor().execute(()->{
try {
pulsar().getBrokerService().monitorBacklogQuota();
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
LOG.error("trigger backlogQuotaCheck fail", e);
asyncResponse.resume(new RestException(e));
}
});
}
@GET
@Path("/ready")
@ApiOperation(value = "Check if the broker is fully initialized")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Broker is ready"),
@ApiResponse(code = 500, message = "Broker is not ready") })
public void isReady(@Suspended AsyncResponse asyncResponse) {
if (pulsar().getState() == State.Started) {
asyncResponse.resume(Response.ok("ok").build());
} else {
asyncResponse.resume(Response.serverError().build());
}
}
@GET
@Path("/health")
@ApiOperation(value = "Run a healthcheck against the broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception {
validateSuperUserAccess();
String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
String topic = String.format("persistent://%s/healthcheck", heartbeatNamespace);
PulsarClient client = pulsar().getClient();
String messageStr = UUID.randomUUID().toString();
// create non-partitioned topic manually and close the previous reader if present.
try {
pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
for (Subscription value : t.getSubscriptions().values()) {
try {
value.deleteForcefully();
} catch (Exception e) {
LOG.warn("Failed to delete previous subscription {} for health check", value.getName(), e);
}
}
});
} catch (Exception e) {
LOG.warn("Failed to try to delete subscriptions for health check", e);
}
CompletableFuture<Producer<String>> producerFuture =
client.newProducer(Schema.STRING).topic(topic).createAsync();
CompletableFuture<Reader<String>> readerFuture = client.newReader(Schema.STRING)
.topic(topic).startMessageId(MessageId.latest).createAsync();
CompletableFuture<Void> completePromise = new CompletableFuture<>();
CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
(ignore, exception) -> {
if (exception != null) {
completePromise.completeExceptionally(exception);
} else {
producerFuture.thenCompose((producer) -> producer.sendAsync(messageStr))
.whenComplete((ignore2, exception2) -> {
if (exception2 != null) {
completePromise.completeExceptionally(exception2);
}
});
healthcheckReadLoop(readerFuture, completePromise, messageStr);
// timeout read loop after 10 seconds
ScheduledFuture<?> timeout = pulsar().getExecutor().schedule(() -> {
completePromise.completeExceptionally(new TimeoutException("Timed out reading"));
}, 10, TimeUnit.SECONDS);
// don't leave timeout dangling
completePromise.whenComplete((ignore2, exception2) -> {
timeout.cancel(false);
});
}
});
completePromise.whenComplete((ignore, exception) -> {
producerFuture.thenAccept((producer) -> {
producer.closeAsync().whenComplete((ignore2, exception2) -> {
if (exception2 != null) {
LOG.warn("Error closing producer for healthcheck", exception2);
}
});
});
readerFuture.thenAccept((reader) -> {
reader.closeAsync().whenComplete((ignore2, exception2) -> {
if (exception2 != null) {
LOG.warn("Error closing reader for healthcheck", exception2);
}
});
});
if (exception != null) {
asyncResponse.resume(new RestException(exception));
} else {
asyncResponse.resume("ok");
}
});
}
private void healthcheckReadLoop(CompletableFuture<Reader<String>> readerFuture,
CompletableFuture<?> completablePromise,
String messageStr) {
readerFuture.thenAccept((reader) -> {
CompletableFuture<Message<String>> readFuture = reader.readNextAsync()
.whenComplete((m, exception) -> {
if (exception != null) {
completablePromise.completeExceptionally(exception);
} else if (m.getValue().equals(messageStr)) {
completablePromise.complete(null);
} else {
healthcheckReadLoop(readerFuture, completablePromise, messageStr);
}
});
});
}
private synchronized void deleteDynamicConfigurationOnZk(String configName) {
try {
if (BrokerService.isDynamicConfiguration(configName)) {
dynamicConfigurationResources().set(BROKER_SERVICE_CONFIGURATION_PATH, (old) -> {
if (old != null) {
old.remove(configName);
}
return old;
});
LOG.info("[{}] Deleted Service configuration {}", clientAppId(), configName);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Can't update non-dynamic configuration {}", clientAppId(), configName);
}
throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
}
} catch (RestException re) {
throw re;
} catch (Exception ie) {
LOG.error("[{}] Failed to update configuration {}, {}", clientAppId(), configName, ie.getMessage(), ie);
throw new RestException(ie);
}
}
@GET
@Path("/version")
@ApiOperation(value = "Get version of current broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 500, message = "Internal server error")})
public String version() throws Exception {
return PulsarVersion.getVersion();
}
}