blob: ff42b91b3f7484b0e007e084cb33c22ae94435e7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "BinaryProtoLookupService.h"
#include "SharedBuffer.h"
#include <lib/TopicName.h>
#include "ConnectionPool.h"
#include <string>
DECLARE_LOG_OBJECT()
namespace pulsar {
auto BinaryProtoLookupService::getBroker(const TopicName& topicName) -> LookupResultFuture {
return findBroker(serviceNameResolver_.resolveHost(), false, topicName.toString());
}
auto BinaryProtoLookupService::findBroker(const std::string& address, bool authoritative,
const std::string& topic) -> LookupResultFuture {
LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic);
auto promise = std::make_shared<Promise<Result, LookupResult>>();
// NOTE: we can use move capture for topic since C++14
cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address](
Result result,
const ClientConnectionWeakPtr& weakCnx) {
if (result != ResultOk) {
promise->setFailed(result);
return;
}
auto cnx = weakCnx.lock();
if (!cnx) {
LOG_ERROR("Connection to " << address << " is expired before lookup");
promise->setFailed(ResultNotConnected);
return;
}
auto lookupPromise = std::make_shared<LookupDataResultPromise>();
cnx->newTopicLookup(topic, false, listenerName_, newRequestId(), lookupPromise);
lookupPromise->getFuture().addListener([this, cnx, promise, topic, address](
Result result, const LookupDataResultPtr& data) {
if (result != ResultOk || !data) {
LOG_ERROR("Lookup failed for " << topic << ", result " << result);
promise->setFailed(result);
return;
}
const auto responseBrokerAddress =
(serviceNameResolver_.useTls() ? data->getBrokerUrlTls() : data->getBrokerUrl());
if (data->isRedirect()) {
LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress);
findBroker(responseBrokerAddress, data->isAuthoritative(), topic)
.addListener([promise](Result result, const LookupResult& value) {
if (result == ResultOk) {
promise->setValue(value);
} else {
promise->setFailed(result);
}
});
} else {
LOG_DEBUG("Lookup response for " << topic << ", lookup-broker-url " << data->getBrokerUrl());
if (data->shouldProxyThroughServiceUrl()) {
// logicalAddress is the proxy's address, we should still connect through proxy
promise->setValue({responseBrokerAddress, address});
} else {
promise->setValue({responseBrokerAddress, responseBrokerAddress});
}
}
});
});
return promise->getFuture();
}
/*
* @param topicName topic to get number of partitions.
*
*/
Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetadataAsync(
const TopicNamePtr& topicName) {
LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
if (!topicName) {
promise->setFailed(ResultInvalidTopicName);
return promise->getFuture();
}
std::string lookupName = topicName->toString();
const auto address = serviceNameResolver_.resolveHost();
cnxPool_.getConnectionAsync(address, address)
.addListener(std::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this,
lookupName, std::placeholders::_1, std::placeholders::_2, promise));
return promise->getFuture();
}
void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& topicName, Result result,
const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(result);
return;
}
LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
conn->newPartitionedMetadataLookup(topicName, requestId, lookupPromise);
lookupPromise->getFuture().addListener(std::bind(&BinaryProtoLookupService::handlePartitionMetadataLookup,
this, topicName, std::placeholders::_1,
std::placeholders::_2, clientCnx, promise));
}
void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, Result result,
LookupDataResultPtr data,
const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise) {
if (data) {
LOG_DEBUG("PartitionMetadataLookup response for " << topicName << ", lookup-broker-url "
<< data->getBrokerUrl());
promise->setValue(data);
} else {
LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ", result " << result);
promise->setFailed(result);
}
}
uint64_t BinaryProtoLookupService::newRequestId() {
Lock lock(mutex_);
return ++requestIdGenerator_;
}
Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName) {
NamespaceTopicsPromisePtr promise = std::make_shared<Promise<Result, NamespaceTopicsPtr>>();
if (!nsName) {
promise->setFailed(ResultInvalidTopicName);
return promise->getFuture();
}
std::string namespaceName = nsName->toString();
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this,
namespaceName, std::placeholders::_1, std::placeholders::_2, promise));
return promise->getFuture();
}
void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result,
const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(result);
return;
}
ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " nsName: " << nsName);
conn->newGetTopicsOfNamespace(nsName, requestId)
.addListener(std::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, this,
std::placeholders::_1, std::placeholders::_2, promise));
}
void BinaryProtoLookupService::getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr,
NamespaceTopicsPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(ResultLookupError);
return;
}
promise->setValue(topicsPtr);
}
} // namespace pulsar