blob: 967059c07184b2a5f82d99f0b56052fec6cc920c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.lookup;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.protocol.Commands.newLookupResponse;
import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.ws.rs.Encoded;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopicLookupBase extends PulsarWebResource {
private static final String LOOKUP_PATH_V1 = "/lookup/v2/destination/";
private static final String LOOKUP_PATH_V2 = "/lookup/v2/topic/";
protected void internalLookupTopicAsync(TopicName topicName, boolean authoritative,
AsyncResponse asyncResponse, String listenerName) {
if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
log.warn("No broker was found available for topic {}", topicName);
asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}
try {
validateClusterOwnership(topicName.getCluster());
validateAdminAndClientPermission(topicName);
validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
} catch (WebApplicationException we) {
// Validation checks failed
log.error("Validation check failed: {}", we.getMessage());
completeLookupResponseExceptionally(asyncResponse, we);
return;
} catch (Throwable t) {
// Validation checks failed with unknown error
log.error("Validation check failed: {}", t.getMessage(), t);
completeLookupResponseExceptionally(asyncResponse, new RestException(t));
return;
}
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
.getBrokerServiceUrlAsync(topicName,
LookupOptions.builder().advertisedListenerName(listenerName)
.authoritative(authoritative).loadTopicsInBundle(false).build());
lookupFuture.thenAccept(optionalResult -> {
if (optionalResult == null || !optionalResult.isPresent()) {
log.warn("No broker was found available for topic {}", topicName);
completeLookupResponseExceptionally(asyncResponse,
new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}
LookupResult result = optionalResult.get();
// We have found either a broker that owns the topic, or a broker to which we should redirect the client to
if (result.isRedirect()) {
boolean newAuthoritative = result.isAuthoritativeRedirect();
URI redirect;
try {
String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
: result.getLookupData().getHttpUrl();
checkNotNull(redirectUrl, "Redirected cluster's service url is not configured");
String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
String path = String.format("%s%s%s?authoritative=%s",
redirectUrl, lookupPath, topicName.getLookupName(), newAuthoritative);
path = listenerName == null ? path : path + "&listenerName=" + listenerName;
redirect = new URI(path);
} catch (URISyntaxException | NullPointerException e) {
log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
completeLookupResponseExceptionally(asyncResponse, e);
return;
}
if (log.isDebugEnabled()) {
log.debug("Redirect lookup for topic {} to {}", topicName, redirect);
}
completeLookupResponseExceptionally(asyncResponse,
new WebApplicationException(Response.temporaryRedirect(redirect).build()));
} else {
// Found broker owning the topic
if (log.isDebugEnabled()) {
log.debug("Lookup succeeded for topic {} -- broker: {}", topicName, result.getLookupData());
}
completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
}
}).exceptionally(exception -> {
log.warn("Failed to lookup broker for topic {}: {}", topicName, exception.getMessage(), exception);
completeLookupResponseExceptionally(asyncResponse, exception);
return null;
});
}
private void validateAdminAndClientPermission(TopicName topic) throws RestException, Exception {
try {
validateTopicOperation(topic, TopicOperation.LOOKUP);
} catch (Exception e) {
// unknown error marked as internal server error
throw new RestException(e);
}
}
protected String internalGetNamespaceBundle(TopicName topicName) {
validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.GET_BUNDLE);
try {
NamespaceBundle bundle = pulsar().getNamespaceService().getBundle(topicName);
return bundle.getBundleRange();
} catch (Exception e) {
log.error("[{}] Failed to get namespace bundle for {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}
/**
* Lookup broker-service address for a given namespace-bundle which contains given topic.
*
* a. Returns broker-address if namespace-bundle is already owned by any broker
* b. If current-broker receives lookup-request and if it's not a leader then current broker redirects request
* to leader by returning leader-service address.
* c. If current-broker is leader then it finds out least-loaded broker to
* own namespace bundle and redirects request
* by returning least-loaded broker.
* d. If current-broker receives request to own the namespace-bundle then
* it owns a bundle and returns success(connect)
* response to client.
*
* @param pulsarService
* @param topicName
* @param authoritative
* @param clientAppId
* @param requestId
* @return
*/
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId,
authenticationData, requestId, null);
}
/**
*
* Lookup broker-service address for a given namespace-bundle which contains given topic.
*
* a. Returns broker-address if namespace-bundle is already owned by any broker
* b. If current-broker receives lookup-request and if it's not a leader then current broker redirects request
* to leader by returning leader-service address.
* c. If current-broker is leader then it finds out least-loaded broker
* to own namespace bundle and redirects request
* by returning least-loaded broker.
* d. If current-broker receives request to own the namespace-bundle then
* it owns a bundle and returns success(connect)
* response to client.
*
* @param pulsarService
* @param topicName
* @param authoritative
* @param clientAppId
* @param requestId
* @param advertisedListenerName
* @return
*/
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
boolean authoritative, String clientAppId,
AuthenticationDataSource authenticationData,
long requestId, final String advertisedListenerName) {
final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
final String cluster = topicName.getCluster();
// (1) validate cluster
getClusterDataIfDifferentCluster(pulsarService, cluster, clientAppId).thenAccept(differentClusterData -> {
if (differentClusterData != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Redirecting the lookup call to {}/{} cluster={}", clientAppId,
differentClusterData.getBrokerServiceUrl(), differentClusterData.getBrokerServiceUrlTls(),
cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId, false));
} else {
// (2) authorize client
checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
// (3) validate global namespace
checkLocalOrGetPeerReplicationCluster(pulsarService,
topicName.getNamespaceObject())
.thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
validationFuture.complete(null);
return;
}
// if peer-cluster-data is present it means namespace is owned by that peer-cluster and
// request should be redirect to the peer-cluster
if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
&& StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
"Redirected cluster's brokerService url is not configured",
requestId));
return;
}
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId,
false));
}).exceptionally(ex -> {
validationFuture.complete(
newLookupErrorResponse(ServerError.MetadataError,
FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
return null;
});
})
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
throwable.getMessage(), requestId));
} else {
log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
validationFuture.completeExceptionally(throwable);
}
return null;
});
}
}).exceptionally(ex -> {
validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
// Initiate lookup once validation completes
validationFuture.thenAccept(validationFailureResponse -> {
if (validationFailureResponse != null) {
lookupfuture.complete(validationFailureResponse);
} else {
LookupOptions options = LookupOptions.builder()
.authoritative(authoritative)
.advertisedListenerName(advertisedListenerName)
.loadTopicsInBundle(true)
.build();
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
.thenAccept(lookupResult -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Lookup result {}", topicName.toString(), lookupResult);
}
if (!lookupResult.isPresent()) {
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady,
"No broker was available to own " + topicName, requestId));
return;
}
LookupData lookupData = lookupResult.get().getLookupData();
if (lookupResult.get().isRedirect()) {
boolean newAuthoritative = lookupResult.get().isAuthoritativeRedirect();
lookupfuture.complete(
newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(),
newAuthoritative, LookupType.Redirect, requestId, false));
} else {
ServiceConfiguration conf = pulsarService.getConfiguration();
lookupfuture.complete(newLookupResponse(lookupData.getBrokerUrl(),
lookupData.getBrokerUrlTls(), true /* authoritative */, LookupType.Connect,
requestId, shouldRedirectThroughServiceUrl(conf, lookupData)));
}
}).exceptionally(ex -> {
if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
log.info("Failed to lookup {} for topic {} with error {}", clientAppId,
topicName.toString(), ex.getCause().getMessage());
} else {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId,
topicName.toString(), ex.getMessage(), ex);
}
lookupfuture.complete(
newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
return null;
});
}
}).exceptionally(ex -> {
if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
log.info("Failed to lookup {} for topic {} with error {}", clientAppId, topicName.toString(),
ex.getCause().getMessage());
} else {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, topicName.toString(),
ex.getMessage(), ex);
}
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
return null;
});
return lookupfuture;
}
private void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
asyncResponse.resume(t);
}
private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, LookupData lookupData) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
asyncResponse.resume(lookupData);
}
protected TopicName getTopicName(String topicDomain, String tenant, String cluster, String namespace,
@Encoded String encodedTopic) {
String decodedName = Codec.decode(encodedTopic);
return TopicName.get(TopicDomain.getEnum(topicDomain).value(), tenant, cluster, namespace, decodedName);
}
protected TopicName getTopicName(String topicDomain, String tenant, String namespace,
@Encoded String encodedTopic) {
String decodedName = Codec.decode(encodedTopic);
return TopicName.get(TopicDomain.getEnum(topicDomain).value(), tenant, namespace, decodedName);
}
private static boolean shouldRedirectThroughServiceUrl(ServiceConfiguration conf, LookupData lookupData) {
// When running in standalone mode we want to redirect the client through the service URL,
// if the advertised address is a loopback address (see PulsarStandaloneStarter).
if (!conf.isRunningStandalone()) {
return false;
}
if (!StringUtils.isEmpty(lookupData.getBrokerUrl())) {
try {
URI host = URI.create(lookupData.getBrokerUrl());
return InetAddress.getByName(host.getHost()).isLoopbackAddress();
} catch (Exception e) {
log.info("Failed to resolve advertised address {}: {}", lookupData.getBrokerUrl(), e.getMessage());
return false;
}
}
if (!StringUtils.isEmpty(lookupData.getBrokerUrlTls())) {
try {
URI host = URI.create(lookupData.getBrokerUrlTls());
return InetAddress.getByName(host.getHost()).isLoopbackAddress();
} catch (Exception e) {
log.info("Failed to resolve advertised address {}: {}", lookupData.getBrokerUrlTls(), e.getMessage());
return false;
}
}
return false;
}
private static final Logger log = LoggerFactory.getLogger(TopicLookupBase.class);
}