blob: d5ce9213211dd8e8b1d5192b6029756bb3bb4fcf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static java.lang.String.format;
import io.netty.buffer.ByteBuf;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BinaryProtoLookupService implements LookupService {
private final PulsarClientImpl client;
private final ServiceNameResolver serviceNameResolver;
private final boolean useTls;
private final ExecutorService executor;
private final String listenerName;
private final int maxLookupRedirects;
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
ExecutorService executor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, executor);
}
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
throws PulsarClientException {
this.client = client;
this.useTls = useTls;
this.executor = executor;
this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
}
@Override
public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
serviceNameResolver.updateServiceUrl(serviceUrl);
}
/**
* Calls broker binaryProto-lookup api to find broker-service address which can serve a given topic.
*
* @param topicName
* topic-name
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
return findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
}
/**
* calls broker binaryProto-lookup api to get metadata of partitioned-topic.
*
*/
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
return getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
}
private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress,
boolean authoritative, TopicName topicName, final int redirectCount) {
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> addressFuture = new CompletableFuture<>();
if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) {
addressFuture.completeExceptionally(
new PulsarClientException.LookupException("Too many redirects: " + maxLookupRedirects));
return addressFuture;
}
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
if (t != null) {
// lookup failed
log.warn("[{}] failed to send lookup request : {}", topicName, t.getMessage());
if (log.isDebugEnabled()) {
log.debug("[{}] Lookup response exception: {}", topicName, t);
}
addressFuture.completeExceptionally(t);
} else {
URI uri = null;
try {
// (1) build response broker-address
if (useTls) {
uri = new URI(r.brokerUrlTls);
} else {
String serviceUrl = r.brokerUrl;
uri = new URI(serviceUrl);
}
InetSocketAddress responseBrokerAddress =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
// (2) redirect to given address if response is: redirect
if (r.redirect) {
findBroker(responseBrokerAddress, r.authoritative, topicName, redirectCount + 1)
.thenAccept(addressFuture::complete)
.exceptionally((lookupException) -> {
Throwable cause = FutureUtil.unwrapCompletionException(lookupException);
// lookup failed
if (redirectCount > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] lookup redirection failed ({}) : {}", topicName,
redirectCount, cause.getMessage());
}
} else {
log.warn("[{}] lookup failed : {}", topicName,
cause.getMessage(), cause);
}
addressFuture.completeExceptionally(cause);
return null;
});
} else {
// (3) received correct broker to connect
if (r.proxyThroughServiceUrl) {
// Connect through proxy
addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
} else {
// Normal result with direct connection to broker
addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
}
}
} catch (Exception parseUrlException) {
// Failed to parse url
log.warn("[{}] invalid url {} : {}", topicName, uri, parseUrlException.getMessage(),
parseUrlException);
addressFuture.completeExceptionally(parseUrlException);
}
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
return addressFuture;
}
private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress socketAddress,
TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] failed to get Partitioned metadata : {}", topicName,
t.getMessage(), t);
partitionFuture.completeExceptionally(t);
} else {
try {
partitionFuture.complete(new PartitionedTopicMetadata(r.partitions));
} catch (Exception e) {
partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s,"
+ " error message %s",
r.redirect, topicName, r.partitions,
e.getMessage())));
}
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
return partitionFuture;
}
@Override
public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) {
return getSchema(topicName, null);
}
@Override
public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
CompletableFuture<Optional<SchemaInfo>> schemaFuture = new CompletableFuture<>();
if (version != null && version.length == 0) {
schemaFuture.completeExceptionally(new SchemaSerializationException("Empty schema version"));
return schemaFuture;
}
InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
Optional.ofNullable(BytesSchemaVersion.of(version)));
clientCnx.sendGetSchema(request, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] failed to get schema : {}", topicName,
t.getMessage(), t);
schemaFuture.completeExceptionally(t);
} else {
schemaFuture.complete(r);
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
return schemaFuture;
}
public String getServiceUrl() {
return serviceNameResolver.getServiceUrl();
}
@Override
public InetSocketAddress resolveHost() {
return serviceNameResolver.resolveHost();
}
@Override
public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespace,
Mode mode,
String topicsPattern,
String topicsHash) {
CompletableFuture<GetTopicsResult> topicsFuture = new CompletableFuture<>();
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(1, TimeUnit.MINUTES)
.create();
getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, backoff, opTimeoutMs, topicsFuture, mode,
topicsPattern, topicsHash);
return topicsFuture;
}
private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
NamespaceName namespace,
Backoff backoff,
AtomicLong remainingTime,
CompletableFuture<GetTopicsResult> getTopicsResultFuture,
Mode mode,
String topicsPattern,
String topicsHash) {
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode, topicsPattern, topicsHash);
clientCnx.newGetTopicsOfNamespace(request, requestId).whenComplete((r, t) -> {
if (t != null) {
getTopicsResultFuture.completeExceptionally(t);
} else {
if (log.isDebugEnabled()) {
log.debug("[namespace: {}] Success get topics list in request: {}",
namespace, requestId);
}
// do not keep partition part of topic name
List<String> result = new ArrayList<>();
r.getTopics().forEach(topic -> {
String filtered = TopicName.get(topic).getPartitionedTopicName();
if (!result.contains(filtered)) {
result.add(filtered);
}
});
getTopicsResultFuture.complete(new GetTopicsResult(result, r.getTopicsHash(),
r.isFiltered(), r.isChanged()));
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally((e) -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
getTopicsResultFuture.completeExceptionally(
new PulsarClientException.TimeoutException(
format("Could not get topics of namespace %s within configured timeout",
namespace.toString())));
return null;
}
((ScheduledExecutorService) executor).schedule(() -> {
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, getTopicsResultFuture,
mode, topicsPattern, topicsHash);
}, nextDelay, TimeUnit.MILLISECONDS);
return null;
});
}
@Override
public void close() throws Exception {
// no-op
}
public static class LookupDataResult {
public final String brokerUrl;
public final String brokerUrlTls;
public final int partitions;
public final boolean authoritative;
public final boolean proxyThroughServiceUrl;
public final boolean redirect;
public LookupDataResult(CommandLookupTopicResponse result) {
this.brokerUrl = result.hasBrokerServiceUrl() ? result.getBrokerServiceUrl() : null;
this.brokerUrlTls = result.hasBrokerServiceUrlTls() ? result.getBrokerServiceUrlTls() : null;
this.authoritative = result.isAuthoritative();
this.redirect = result.hasResponse() && result.getResponse() == LookupType.Redirect;
this.proxyThroughServiceUrl = result.isProxyThroughServiceUrl();
this.partitions = -1;
}
public LookupDataResult(int partitions) {
super();
this.partitions = partitions;
this.brokerUrl = null;
this.brokerUrlTls = null;
this.authoritative = false;
this.proxyThroughServiceUrl = false;
this.redirect = false;
}
}
private static final Logger log = LoggerFactory.getLogger(BinaryProtoLookupService.class);
}