blob: a135ade74b40848cfc3b65c794d278a9395ef119 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "ClientConnection.h"
#include <openssl/x509.h>
#include <pulsar/Authentication.h>
#include <pulsar/MessageIdBuilder.h>
#include <pulsar/ServiceInfo.h>
#include <chrono>
#include <fstream>
#include "AsioDefines.h"
#include "ClientConnectionAdaptor.h"
#include "ClientImpl.h"
#include "Commands.h"
#include "ConnectionPool.h"
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "Future.h"
#include "LogUtils.h"
#include "MockServer.h"
#include "OpSendMsg.h"
#include "ProducerImpl.h"
#include "PulsarApi.pb.h"
#include "ResultUtils.h"
#include "ServiceNameResolver.h"
#include "ServiceURI.h"
#include "Url.h"
#include "auth/AuthOauth2.h"
#include "auth/InitialAuthData.h"
#include "checksum/ChecksumProvider.h"
#ifdef USE_ASIO
#include <asio/connect.hpp>
#include <asio/ssl/host_name_verification.hpp>
#else
#include <boost/asio/connect.hpp>
#include <boost/asio/ssl/host_name_verification.hpp>
#endif
DECLARE_LOG_OBJECT()
using namespace ASIO::ip;
namespace pulsar {
namespace {
static std::ostream& operator<<(std::ostream& os, const tcp::resolver::results_type& results) {
for (const auto& entry : results) {
const auto& ep = entry.endpoint();
os << ep.address().to_string() << ":" << ep.port() << " ";
}
return os;
}
} // anonymous namespace
using proto::BaseCommand;
static const uint32_t DefaultBufferSize = 64 * 1024;
static MessageId toMessageId(const proto::MessageIdData& messageIdData) {
return MessageIdBuilder::from(messageIdData).build();
}
// Convert error codes from protobuf to client API Result
static Result getResult(ServerError serverError, const std::string& message) {
switch (serverError) {
case UnknownError:
return ResultUnknownError;
case MetadataError:
return ResultBrokerMetadataError;
case ChecksumError:
return ResultChecksumError;
case PersistenceError:
return ResultBrokerPersistenceError;
case AuthenticationError:
return ResultAuthenticationError;
case AuthorizationError:
return ResultAuthorizationError;
case ConsumerBusy:
return ResultConsumerBusy;
case ServiceNotReady:
return (message.find("the broker do not have test listener") == std::string::npos)
? ResultRetryable
: ResultConnectError;
case ProducerBlockedQuotaExceededError:
return ResultProducerBlockedQuotaExceededError;
case ProducerBlockedQuotaExceededException:
return ResultProducerBlockedQuotaExceededException;
case TopicNotFound:
return ResultTopicNotFound;
case SubscriptionNotFound:
return ResultSubscriptionNotFound;
case ConsumerNotFound:
return ResultConsumerNotFound;
case UnsupportedVersionError:
return ResultUnsupportedVersionError;
case TooManyRequests:
return ResultTooManyLookupRequestException;
case TopicTerminatedError:
return ResultTopicTerminated;
case ProducerBusy:
return ResultProducerBusy;
case InvalidTopicName:
return ResultInvalidTopicName;
case IncompatibleSchema:
return ResultIncompatibleSchema;
case ConsumerAssignError:
return ResultConsumerAssignError;
case TransactionCoordinatorNotFound:
return ResultTransactionCoordinatorNotFoundError;
case InvalidTxnStatus:
return ResultInvalidTxnStatusError;
case NotAllowedError:
return ResultNotAllowedError;
case TransactionConflict:
return ResultTransactionConflict;
case TransactionNotFound:
return ResultTransactionNotFound;
case ProducerFenced:
return ResultProducerFenced;
}
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
// return here to make the compiler happy.
return ResultUnknownError;
}
inline std::ostream& operator<<(std::ostream& os, proto::ServerError error) {
os << getResult(error, "");
return os;
}
static bool file_exists(const std::string& path) {
if (path.empty()) {
return false;
}
std::ifstream f(path);
return f.good();
}
std::atomic<int32_t> ClientConnection::maxMessageSize_{Commands::DefaultMaxMessageSize};
ClientConnection::ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
const ServiceInfo& serviceInfo, const ExecutorServicePtr& executor,
const ClientConfiguration& clientConfiguration,
const std::string& clientVersion, ConnectionPool& pool, size_t poolIndex)
: operationsTimeout_(ClientImpl::getOperationTimeout(clientConfiguration)),
authentication_(serviceInfo.authentication()),
serverProtocolVersion_(proto::ProtocolVersion_MIN),
executor_(executor),
resolver_(executor_->createTcpResolver()),
socket_(executor_->createSocket()),
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
cnxStringPtr_(std::make_shared<std::string>("[<none> -> " + physicalAddress + "] ")),
incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
connectTimeout_(std::chrono::milliseconds(clientConfiguration.getConnectionTimeout())),
connectTimer_(executor_->createDeadlineTimer()),
outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
keepAliveIntervalInSeconds_(clientConfiguration.getKeepAliveIntervalInSeconds()),
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
clientVersion_(clientVersion),
pool_(pool),
poolIndex_(poolIndex) {
LOG_INFO(cnxString() << "Create ClientConnection, timeout="
<< clientConfiguration.getConnectionTimeout());
if (!authentication_) {
LOG_ERROR("Invalid authentication plugin");
throw ResultAuthenticationError;
return;
}
if (auto oauth2Auth = std::dynamic_pointer_cast<AuthOauth2>(authentication_)) {
// Configure the TLS trust certs file for Oauth2
auto authData = std::dynamic_pointer_cast<AuthenticationDataProvider>(
std::make_shared<InitialAuthData>(serviceInfo.tlsTrustCertsFilePath().value_or("")));
oauth2Auth->getAuthData(authData);
}
if (serviceInfo.useTls()) {
ASIO::ssl::context ctx(ASIO::ssl::context::sslv23_client);
ctx.set_options(ASIO::ssl::context::default_workarounds | ASIO::ssl::context::no_sslv2 |
ASIO::ssl::context::no_sslv3 | ASIO::ssl::context::no_tlsv1 |
ASIO::ssl::context::no_tlsv1_1);
Url serviceUrl;
Url proxyUrl;
Url::parse(physicalAddress, serviceUrl);
proxyServiceUrl_ = clientConfiguration.getProxyServiceUrl();
proxyProtocol_ = clientConfiguration.getProxyProtocol();
if (proxyProtocol_ == ClientConfiguration::SNI && !proxyServiceUrl_.empty()) {
Url::parse(proxyServiceUrl_, proxyUrl);
isSniProxy_ = true;
LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_);
}
if (clientConfiguration.isTlsAllowInsecureConnection()) {
ctx.set_verify_mode(ASIO::ssl::context::verify_none);
isTlsAllowInsecureConnection_ = true;
} else {
ctx.set_verify_mode(ASIO::ssl::context::verify_peer);
if (serviceInfo.tlsTrustCertsFilePath()) {
const auto& trustCertFilePath = *serviceInfo.tlsTrustCertsFilePath();
if (file_exists(trustCertFilePath)) {
ctx.load_verify_file(trustCertFilePath);
} else {
LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
throw ResultAuthenticationError;
}
} else {
ctx.set_default_verify_paths();
LOG_INFO("Use " << X509_get_default_cert_file() << " as default CA path");
}
}
std::string tlsCertificates = clientConfiguration.getTlsCertificateFilePath();
std::string tlsPrivateKey = clientConfiguration.getTlsPrivateKeyFilePath();
AuthenticationDataPtr authData;
if (authentication_->getAuthData(authData) == ResultOk && authData->hasDataForTls()) {
tlsCertificates = authData->getTlsCertificates();
tlsPrivateKey = authData->getTlsPrivateKey();
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
throw ResultAuthenticationError;
}
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
throw ResultAuthenticationError;
}
ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem);
ctx.use_certificate_chain_file(tlsCertificates);
} else {
if (file_exists(tlsPrivateKey) && file_exists(tlsCertificates)) {
ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem);
ctx.use_certificate_chain_file(tlsCertificates);
}
}
tlsSocket_ = ExecutorService::createTlsSocket(socket_, ctx);
if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost));
}
LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(), serviceUrl.host().c_str())) {
ASIO_ERROR ec{static_cast<int>(::ERR_get_error()), ASIO::error::get_ssl_category()};
LOG_ERROR(ec.message() << ": Error while setting TLS SNI");
return;
}
}
}
ClientConnection::~ClientConnection() {
LOG_INFO(cnxString() << "Destroyed connection to " << logicalAddress_ << "-" << poolIndex_);
}
void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) {
if (!cmdConnected.has_server_version()) {
LOG_ERROR(cnxString() << "Server version is not set");
close();
return;
}
if (cmdConnected.has_max_message_size()) {
LOG_DEBUG("Connection has max message size setting: " << cmdConnected.max_message_size());
maxMessageSize_.store(cmdConnected.max_message_size(), std::memory_order_release);
LOG_DEBUG("Current max message size is: " << maxMessageSize_);
}
Lock lock(mutex_);
if (isClosed()) {
LOG_INFO(cnxString() << "Connection already closed");
return;
}
cancelTimer(*connectTimer_);
state_ = Ready;
serverProtocolVersion_ = cmdConnected.protocol_version();
if (serverProtocolVersion_ >= proto::v1) {
// Only send keep-alive probes if the broker supports it
keepAliveTimer_ = executor_->createDeadlineTimer();
if (keepAliveTimer_) {
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
keepAliveTimer_->async_wait(
[this, self{shared_from_this()}](const ASIO_ERROR& err) { handleKeepAliveTimeout(err); });
}
}
lock.unlock();
connectPromise_.setValue(shared_from_this());
if (serverProtocolVersion_ >= proto::v8) {
startConsumerStatsTimer(std::vector<uint64_t>());
}
}
void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests) {
std::vector<Promise<Result, BrokerConsumerStatsImpl>> consumerStatsPromises;
Lock lock(mutex_);
for (int i = 0; i < consumerStatsRequests.size(); i++) {
PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find(consumerStatsRequests[i]);
if (it != pendingConsumerStatsMap_.end()) {
LOG_DEBUG(cnxString() << " removing request_id " << it->first
<< " from the pendingConsumerStatsMap_");
consumerStatsPromises.push_back(it->second);
pendingConsumerStatsMap_.erase(it);
} else {
LOG_DEBUG(cnxString() << "request_id " << it->first << " already fulfilled - not removing it");
}
}
consumerStatsRequests.clear();
for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.begin();
it != pendingConsumerStatsMap_.end(); ++it) {
consumerStatsRequests.push_back(it->first);
}
// If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
// Check if we have a timer still before we set the request timer to pop again.
if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->expires_after(operationsTimeout_);
consumerStatsRequestTimer_->async_wait(
[this, self{shared_from_this()}, consumerStatsRequests](const ASIO_ERROR& err) {
handleConsumerStatsTimeout(err, consumerStatsRequests);
});
}
lock.unlock();
// Complex logic since promises need to be fulfilled outside the lock
for (int i = 0; i < consumerStatsPromises.size(); i++) {
consumerStatsPromises[i].setFailed(ResultTimeout);
LOG_WARN(cnxString() << " Operation timedout, didn't get response from broker");
}
}
/// The number of unacknowledged probes to send before considering the connection dead and notifying the
/// application layer
typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPCNT> tcp_keep_alive_count;
/// The interval between subsequential keepalive probes, regardless of what the connection has exchanged in
/// the meantime
typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPINTVL> tcp_keep_alive_interval;
/// The interval between the last data packet sent (simple ACKs are not considered data) and the first
/// keepalive
/// probe; after the connection is marked to need keepalive, this counter is not used any further
#ifdef __APPLE__
typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> tcp_keep_alive_idle;
#else
typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep_alive_idle;
#endif
/*
* TCP Connect handler
*
* if async_connect without any error, connected_ would be set to true
* at this point the connection is deemed valid to be used by clients of this class
*/
void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
if (!err) {
std::stringstream cnxStringStream;
try {
cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
<< "] ";
std::atomic_store(&cnxStringPtr_, std::make_shared<std::string>(cnxStringStream.str()));
} catch (const ASIO_SYSTEM_ERROR& e) {
LOG_ERROR("Failed to get endpoint: " << e.what());
close(ResultRetryable);
return;
}
if (logicalAddress_ == physicalAddress_) {
LOG_INFO(cnxString() << "Connected to broker");
} else {
LOG_INFO(cnxString() << "Connected to broker through proxy. Logical broker: " << logicalAddress_
<< ", proxy: " << proxyServiceUrl_
<< ", physical address:" << physicalAddress_);
}
Lock lock(mutex_);
if (isClosed()) {
LOG_INFO(cnxString() << "Connection already closed");
return;
}
state_ = TcpConnected;
lock.unlock();
ASIO_ERROR error;
socket_->set_option(tcp::no_delay(true), error);
if (error) {
LOG_WARN(cnxString() << "Socket failed to set tcp::no_delay: " << error.message());
}
socket_->set_option(tcp::socket::keep_alive(true), error);
if (error) {
LOG_WARN(cnxString() << "Socket failed to set tcp::socket::keep_alive: " << error.message());
}
// Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
// should never happen, given that we're sending our own keep-alive probes (within the TCP
// connection) every 30 seconds
socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
if (error) {
LOG_DEBUG(cnxString() << "Socket failed to set tcp_keep_alive_idle: " << error.message());
}
// Send up to 10 probes before declaring the connection broken
socket_->set_option(tcp_keep_alive_count(10), error);
if (error) {
LOG_DEBUG(cnxString() << "Socket failed to set tcp_keep_alive_count: " << error.message());
}
// Interval between probes: 6 seconds
socket_->set_option(tcp_keep_alive_interval(6), error);
if (error) {
LOG_DEBUG(cnxString() << "Socket failed to set tcp_keep_alive_interval: " << error.message());
}
if (tlsSocket_) {
if (!isTlsAllowInsecureConnection_) {
ASIO_ERROR err;
Url service_url;
if (!Url::parse(physicalAddress_, service_url)) {
LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
}
}
// socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
// fault might happen
tlsSocket_->async_handshake(
ASIO::ssl::stream<tcp::socket>::client,
[this, self{shared_from_this()}](const auto& err) { handleHandshake(err); });
} else {
handleHandshake(ASIO_SUCCESS);
}
} else {
LOG_ERROR(cnxString() << "Failed to establish connection to " << endpoint << ": " << err.message());
{
std::lock_guard lock{mutex_};
if (isClosed()) {
return;
}
cancelTimer(*connectTimer_);
}
if (err == ASIO::error::operation_aborted) {
close();
} else {
close(ResultRetryable);
}
}
}
void ClientConnection::handleHandshake(const ASIO_ERROR& err) {
if (err) {
if (err.value() == ASIO::ssl::error::stream_truncated) {
LOG_WARN(cnxString() << "Handshake failed: " << err.message());
close(ResultRetryable);
} else {
LOG_ERROR(cnxString() << "Handshake failed: " << err.message());
close();
}
return;
}
bool connectingThroughProxy = logicalAddress_ != physicalAddress_;
Result result = ResultOk;
SharedBuffer buffer;
try {
buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy,
clientVersion_, result);
} catch (const std::exception& e) {
LOG_ERROR(cnxString() << "Failed to create Connect command: " << e.what());
close(ResultAuthenticationError);
return;
}
if (result != ResultOk) {
LOG_ERROR(cnxString() << "Failed to establish connection: " << result);
close(result);
return;
}
// Send CONNECT command to broker
auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) {
handleSentPulsarConnect(err, buffer);
}));
}
void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const SharedBuffer& buffer) {
if (isClosed()) {
return;
}
if (err) {
LOG_ERROR(cnxString() << "Failed to establish connection: " << err.message());
close();
return;
}
// Schedule the reading of CONNECTED command from broker
readNextCommand();
}
void ClientConnection::handleSentAuthResponse(const ASIO_ERROR& err, const SharedBuffer& buffer) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString() << "Failed to send auth response: " << err.message());
close();
return;
}
}
/*
* Async method to establish TCP connection with broker
*
* tcpConnectCompletionHandler is notified when the result of this call is available.
*
*/
void ClientConnection::tcpConnectAsync() {
if (isClosed()) {
return;
}
ASIO_ERROR err;
Url service_url;
std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
if (!Url::parse(hostUrl, service_url)) {
LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
}
if (service_url.protocol() != "pulsar" && service_url.protocol() != "pulsar+ssl") {
LOG_ERROR(cnxString() << "Invalid Url protocol '" << service_url.protocol()
<< "'. Valid values are 'pulsar' and 'pulsar+ssl'");
close();
return;
}
LOG_DEBUG(cnxString() << "Resolving " << service_url.host() << ":" << service_url.port());
resolver_->async_resolve(
service_url.host(), std::to_string(service_url.port()),
[this, self{shared_from_this()}](auto err, const auto& results) { handleResolve(err, results); });
}
void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::results_type& results) {
if (err) {
std::string hostUrl = isSniProxy_ ? cnxString() : proxyServiceUrl_;
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
close();
return;
}
if (!results.empty()) {
LOG_DEBUG(cnxString() << "Resolved " << results.size() << " endpoints");
for (const auto& entry : results) {
const auto& ep = entry.endpoint();
LOG_DEBUG(cnxString() << " " << ep.address().to_string() << ":" << ep.port());
}
}
std::lock_guard lock{mutex_};
if (isClosed()) {
return;
}
connectTimer_->expires_after(connectTimeout_);
connectTimer_->async_wait([this, results, self{shared_from_this()}](const auto& err) {
if (err) {
return;
}
Lock lock{mutex_};
if (!isClosed() && state_ != Ready) {
LOG_ERROR(cnxString() << "Connection to " << results << " was not established in "
<< connectTimeout_.count() << " ms");
lock.unlock();
close();
} // else: the connection is closed or already established
});
ASIO::async_connect(
*socket_, results,
[this, self{shared_from_this()}](const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
handleTcpConnected(err, endpoint);
});
}
void ClientConnection::readNextCommand() {
const static uint32_t minReadSize = sizeof(uint32_t);
auto self = shared_from_this();
asyncReceive(incomingBuffer_.asio_buffer(),
customAllocReadHandler([this, self](const ASIO_ERROR& err, size_t bytesTransferred) {
handleRead(err, bytesTransferred, minReadSize);
}));
}
void ClientConnection::handleRead(const ASIO_ERROR& err, size_t bytesTransferred, uint32_t minReadSize) {
if (isClosed()) {
return;
}
// Update buffer write idx with new data
incomingBuffer_.bytesWritten(bytesTransferred);
if (err || bytesTransferred == 0) {
if (err == ASIO::error::operation_aborted) {
LOG_DEBUG(cnxString() << "Read operation was canceled: " << err.message());
} else if (bytesTransferred == 0 || err == ASIO::error::eof) {
LOG_DEBUG(cnxString() << "Server closed the connection: " << err.message());
} else {
LOG_ERROR(cnxString() << "Read operation failed: " << err.message());
}
close(ResultDisconnected);
} else if (bytesTransferred < minReadSize) {
// Read the remaining part, use a slice of buffer to write on the next
// region
SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred);
auto self = shared_from_this();
auto nextMinReadSize = minReadSize - bytesTransferred;
asyncReceive(buffer.asio_buffer(),
customAllocReadHandler(
[this, self, nextMinReadSize](const ASIO_ERROR& err, size_t bytesTransferred) {
handleRead(err, bytesTransferred, nextMinReadSize);
}));
} else {
processIncomingBuffer();
}
}
void ClientConnection::processIncomingBuffer() {
// Process all the available frames from the incoming buffer
while (incomingBuffer_.readableBytes() >= sizeof(uint32_t)) {
// Extract message frames from incoming buffer
// At this point we have at least 4 bytes in the buffer
uint32_t frameSize = incomingBuffer_.readUnsignedInt();
if (frameSize > incomingBuffer_.readableBytes()) {
// We don't have the entire frame yet
const uint32_t bytesToReceive = frameSize - incomingBuffer_.readableBytes();
// Rollback the reading of frameSize (when the frame will be complete,
// we'll read it again
incomingBuffer_.rollback(sizeof(uint32_t));
if (bytesToReceive > incomingBuffer_.writableBytes()) {
// Need to allocate a buffer big enough for the frame
uint32_t newBufferSize = std::max<uint32_t>(DefaultBufferSize, frameSize + sizeof(uint32_t));
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, newBufferSize);
}
auto self = shared_from_this();
asyncReceive(incomingBuffer_.asio_buffer(),
customAllocReadHandler(
[this, self, bytesToReceive](const ASIO_ERROR& err, size_t bytesTransferred) {
handleRead(err, bytesTransferred, bytesToReceive);
}));
return;
}
// At this point, we have at least one complete frame available in the buffer
uint32_t cmdSize = incomingBuffer_.readUnsignedInt();
proto::BaseCommand incomingCmd;
if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
LOG_ERROR(cnxString() << "Error parsing protocol buffer command");
close(ResultDisconnected);
return;
}
incomingBuffer_.consume(cmdSize);
if (incomingCmd.type() == BaseCommand::MESSAGE) {
// Parse message metadata and extract payload
proto::MessageMetadata msgMetadata;
proto::BrokerEntryMetadata brokerEntryMetadata;
// read checksum
uint32_t remainingBytes = frameSize - (cmdSize + 4);
auto readerIndex = incomingBuffer_.readerIndex();
if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) {
// broker entry metadata is present
uint32_t brokerEntryMetadataSize = incomingBuffer_.readUnsignedInt();
if (!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), brokerEntryMetadataSize)) {
LOG_ERROR(cnxString()
<< "[consumer id " << incomingCmd.message().consumer_id()
<< ", message ledger id " << incomingCmd.message().message_id().ledgerid()
<< ", entry id " << incomingCmd.message().message_id().entryid()
<< "] Error parsing broker entry metadata");
close(ResultDisconnected);
return;
}
incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize);
remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
} else {
incomingBuffer_.setReaderIndex(readerIndex);
}
bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);
uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
LOG_ERROR(cnxString()
<< "[consumer id " << incomingCmd.message().consumer_id() //
<< ", message ledger id " << incomingCmd.message().message_id().ledgerid() //
<< ", entry id " << incomingCmd.message().message_id().entryid()
<< "] Error parsing message metadata");
close(ResultDisconnected);
return;
}
incomingBuffer_.consume(metadataSize);
remainingBytes -= (4 + metadataSize);
uint32_t payloadSize = remainingBytes;
SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
incomingBuffer_.consume(payloadSize);
handleIncomingMessage(incomingCmd.message(), isChecksumValid, brokerEntryMetadata, msgMetadata,
payload);
} else {
handleIncomingCommand(incomingCmd);
}
}
if (incomingBuffer_.readableBytes() > 0) {
// We still have 1 to 3 bytes from the next frame
assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));
// Restart with a new buffer and copy the few bytes at the beginning
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize);
// At least we need to read 4 bytes to have the complete frame size
uint32_t minReadSize = sizeof(uint32_t) - incomingBuffer_.readableBytes();
auto self = shared_from_this();
asyncReceive(
incomingBuffer_.asio_buffer(),
customAllocReadHandler([this, self, minReadSize](const ASIO_ERROR& err, size_t bytesTransferred) {
handleRead(err, bytesTransferred, minReadSize);
}));
return;
}
// We have read everything we had in the buffer
// Rollback the indexes to reuse the same buffer
incomingBuffer_.reset();
readNextCommand();
}
bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& remainingBytes,
proto::BaseCommand& incomingCmd) {
int readerIndex = incomingBuffer_.readerIndex();
bool isChecksumValid = true;
if (incomingBuffer_.readUnsignedShort() == Commands::magicCrc32c) {
uint32_t storedChecksum = incomingBuffer_.readUnsignedInt();
remainingBytes -= (2 + 4) /* subtract size of checksum itself */;
// compute metadata-payload checksum
int metadataPayloadSize = remainingBytes;
uint32_t computedChecksum = computeChecksum(0, incomingBuffer_.data(), metadataPayloadSize);
// verify checksum
isChecksumValid = (storedChecksum == computedChecksum);
if (!isChecksumValid) {
LOG_ERROR("[consumer id "
<< incomingCmd.message().consumer_id() //
<< ", message ledger id " << incomingCmd.message().message_id().ledgerid() //
<< ", entry id " << incomingCmd.message().message_id().entryid() //
<< "stored-checksum" << storedChecksum << "computedChecksum" << computedChecksum //
<< "] Checksum verification failed");
}
} else {
incomingBuffer_.setReaderIndex(readerIndex);
}
return isChecksumValid;
}
void ClientConnection::handleActiveConsumerChange(const proto::CommandActiveConsumerChange& change) {
LOG_DEBUG(cnxString() << "Received notification about active consumer change, consumer_id: "
<< change.consumer_id() << " isActive: " << change.is_active());
Lock lock(mutex_);
ConsumersMap::iterator it = consumers_.find(change.consumer_id());
if (it != consumers_.end()) {
ConsumerImplPtr consumer = it->second.lock();
if (consumer) {
lock.unlock();
consumer->activeConsumerChanged(change.is_active());
} else {
consumers_.erase(change.consumer_id());
LOG_DEBUG(cnxString() << "Ignoring incoming message for already destroyed consumer "
<< change.consumer_id());
}
} else {
LOG_DEBUG(cnxString() << "Got invalid consumer Id in " << change.consumer_id()
<< " -- isActive: " << change.is_active());
}
}
void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
proto::BrokerEntryMetadata& brokerEntryMetadata,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload) {
LOG_DEBUG(cnxString() << "Received a message from the server for consumer: " << msg.consumer_id());
Lock lock(mutex_);
ConsumersMap::iterator it = consumers_.find(msg.consumer_id());
if (it != consumers_.end()) {
ConsumerImplPtr consumer = it->second.lock();
if (consumer) {
// Unlock the mutex before notifying the consumer of the
// new received message
lock.unlock();
consumer->messageReceived(shared_from_this(), msg, isChecksumValid, brokerEntryMetadata,
msgMetadata, payload);
} else {
consumers_.erase(msg.consumer_id());
LOG_DEBUG(cnxString() << "Ignoring incoming message for already destroyed consumer "
<< msg.consumer_id());
}
} else {
LOG_DEBUG(cnxString() << "Got invalid consumer Id in " //
<< msg.consumer_id() << " -- msg: " << msgMetadata.sequence_id());
}
}
void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
LOG_DEBUG(cnxString() << "Handling incoming command: " << Commands::messageType(incomingCmd.type()));
switch (state_.load()) {
case Pending: {
LOG_ERROR(cnxString() << "Connection is not ready yet");
break;
}
case TcpConnected: {
// Handle Pulsar Connected
if (incomingCmd.type() != BaseCommand::CONNECTED) {
// Wrong cmd
close();
} else {
handlePulsarConnected(incomingCmd.connected());
}
break;
}
case Disconnected: {
LOG_ERROR(cnxString() << "Connection already disconnected");
break;
}
case Ready: {
// Since we are receiving data from the connection, we are assuming that for now the
// connection is still working well.
if (incomingCmd.type() != BaseCommand::PING) {
havePendingPingRequest_ = false;
}
// Handle normal commands
switch (incomingCmd.type()) {
case BaseCommand::SEND_RECEIPT:
handleSendReceipt(incomingCmd.send_receipt());
break;
case BaseCommand::SEND_ERROR:
handleSendError(incomingCmd.send_error());
break;
case BaseCommand::SUCCESS:
handleSuccess(incomingCmd.success());
break;
case BaseCommand::PARTITIONED_METADATA_RESPONSE:
handlePartitionedMetadataResponse(incomingCmd.partitionmetadataresponse());
break;
case BaseCommand::CONSUMER_STATS_RESPONSE:
handleConsumerStatsResponse(incomingCmd.consumerstatsresponse());
break;
case BaseCommand::LOOKUP_RESPONSE:
handleLookupTopicRespose(incomingCmd.lookuptopicresponse());
break;
case BaseCommand::PRODUCER_SUCCESS:
handleProducerSuccess(incomingCmd.producer_success());
break;
case BaseCommand::ERROR:
handleError(incomingCmd.error());
break;
case BaseCommand::TOPIC_MIGRATED:
handleTopicMigrated(incomingCmd.topicmigrated());
break;
case BaseCommand::CLOSE_PRODUCER:
handleCloseProducer(incomingCmd.close_producer());
break;
case BaseCommand::CLOSE_CONSUMER:
handleCloseConsumer(incomingCmd.close_consumer());
break;
case BaseCommand::PING:
// Respond to ping request
LOG_DEBUG(cnxString() << "Replying to ping command");
sendCommand(Commands::newPong());
break;
case BaseCommand::PONG:
LOG_DEBUG(cnxString() << "Received response to ping message");
break;
case BaseCommand::AUTH_CHALLENGE:
handleAuthChallenge();
break;
case BaseCommand::ACTIVE_CONSUMER_CHANGE:
handleActiveConsumerChange(incomingCmd.active_consumer_change());
break;
case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
handleGetLastMessageIdResponse(incomingCmd.getlastmessageidresponse());
break;
case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
handleGetTopicOfNamespaceResponse(incomingCmd.gettopicsofnamespaceresponse());
break;
case BaseCommand::GET_SCHEMA_RESPONSE:
handleGetSchemaResponse(incomingCmd.getschemaresponse());
break;
case BaseCommand::ACK_RESPONSE:
handleAckResponse(incomingCmd.ackresponse());
break;
default:
LOG_WARN(cnxString() << "Received invalid message from server");
close(ResultDisconnected);
break;
}
}
}
}
Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint64_t consumerId,
uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, BrokerConsumerStatsImpl> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString() << " Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise));
lock.unlock();
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
mockServer_->sendRequest("CONSUMER_STATS", requestId)) {
return promise.getFuture();
}
sendCommand(Commands::newConsumerStats(consumerId, requestId));
return promise.getFuture();
}
void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative,
const std::string& listenerName, uint64_t requestId,
const LookupDataResultPromisePtr& promise) {
newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, "LOOKUP",
promise);
}
void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, uint64_t requestId,
const LookupDataResultPromisePtr& promise) {
newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, "PARTITIONED_METADATA",
promise);
}
void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType,
const LookupDataResultPromisePtr& promise) {
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
promise->setFailed(ResultNotConnected);
return;
} else if (numOfPendingLookupRequest_ >= maxPendingLookupRequest_) {
lock.unlock();
promise->setFailed(ResultTooManyLookupRequestException);
return;
}
auto request = insertRequest(
pendingLookupRequests_, requestId, [weakSelf{weak_from_this()}, requestId, requestType]() {
if (auto self = weakSelf.lock()) {
LOG_WARN(self->cnxString()
<< requestType << " request timeout to broker, req_id: " << requestId);
self->numOfPendingLookupRequest_--;
}
});
request->getFuture().addListener([promise](Result result, const LookupDataResultPtr& lookupDataResult) {
if (result == ResultOk) {
promise->setValue(lookupDataResult);
} else {
promise->setFailed(result);
}
});
numOfPendingLookupRequest_++;
lock.unlock();
LOG_DEBUG(cnxString() << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")");
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
mockServer_->sendRequest(requestType, requestId)) {
return;
}
sendCommand(cmd);
}
void ClientConnection::sendCommand(const SharedBuffer& cmd) {
Lock lock(mutex_);
if (pendingWriteOperations_++ == 0) {
// Write immediately to socket
executor_->dispatch([this, cmd, self{shared_from_this()}] { sendCommandInternal(cmd); });
} else {
// Queue to send later
pendingWriteBuffers_.push_back(cmd);
}
}
void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
auto self = shared_from_this();
asyncWrite(cmd.const_asio_buffer(),
customAllocWriteHandler([this, self, cmd](const ASIO_ERROR& err, size_t bytesTransferred) {
handleSend(err, cmd);
}));
}
void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
Lock lock(mutex_);
if (pendingWriteOperations_++ > 0) {
pendingWriteBuffers_.emplace_back(args);
return;
}
auto self = shared_from_this();
auto sendMessageInternal = [this, self, args] {
BaseCommand outgoingCmd;
auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
// Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to the underlying socket send.
asyncWrite(buffer, customAllocWriteHandler(
[this, self, buffer](const ASIO_ERROR& err, size_t bytesTransferred) {
handleSendPair(err);
}));
};
executor_->dispatch(sendMessageInternal);
}
void ClientConnection::handleSend(const ASIO_ERROR& err, const SharedBuffer&) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString() << "Could not send message on connection: " << err << " " << err.message());
close(ResultDisconnected);
} else {
sendPendingCommands();
}
}
void ClientConnection::handleSendPair(const ASIO_ERROR& err) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString() << "Could not send pair message on connection: " << err << " " << err.message());
close(ResultDisconnected);
} else {
sendPendingCommands();
}
}
void ClientConnection::sendPendingCommands() {
Lock lock(mutex_);
if (--pendingWriteOperations_ > 0) {
assert(!pendingWriteBuffers_.empty());
auto any = pendingWriteBuffers_.front();
pendingWriteBuffers_.pop_front();
auto self = shared_from_this();
if (any.type() == typeid(SharedBuffer)) {
SharedBuffer buffer = std::any_cast<SharedBuffer>(any);
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler(
[this, self, buffer](const ASIO_ERROR& err, size_t) { handleSend(err, buffer); }));
} else {
assert(any.type() == typeid(std::shared_ptr<SendArguments>));
auto args = std::any_cast<std::shared_ptr<SendArguments>>(any);
BaseCommand outgoingCmd;
PairSharedBuffer buffer =
Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
// Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to the underlying socket send.
asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) {
handleSendPair(err);
}));
}
} else {
// No more pending writes
outgoingBuffer_.reset();
}
}
Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId,
const char* requestType) {
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " << requestId
<< ") to a closed connection");
Promise<Result, ResponseData> promise;
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
auto request = insertRequest(
pendingRequests_, requestId,
[cnxString{cnxString()}, physicalAddress{physicalAddress_}, requestId, requestType]() {
LOG_WARN(cnxString << "Network request timeout to broker, remote: " << physicalAddress
<< ", req_id: " << requestId << ", request: " << requestType);
});
lock.unlock();
LOG_DEBUG(cnxString() << "Inserted request " << requestType << " (req_id: " << requestId << ")");
if (mockingRequests_.load(std::memory_order_acquire)) {
if (mockServer_ == nullptr) {
LOG_WARN(cnxString() << "Mock server is unexpectedly null when processing " << requestType);
sendCommand(cmd);
} else if (!mockServer_->sendRequest(requestType, requestId)) {
sendCommand(cmd);
}
} else {
sendCommand(cmd);
}
return request->getFuture();
}
void ClientConnection::handleKeepAliveTimeout(const ASIO_ERROR& ec) {
if (isClosed() || ec) {
return;
}
if (havePendingPingRequest_) {
LOG_WARN(cnxString() << "Forcing connection to close after keep-alive timeout");
close(ResultDisconnected);
} else {
// Send keep alive probe to peer
LOG_DEBUG(cnxString() << "Sending ping message");
havePendingPingRequest_ = true;
sendCommand(Commands::newPing());
// If the close operation has already called the keepAliveTimer_.reset() then the use_count will
// be zero And we do not attempt to dereference the pointer.
Lock lock(mutex_);
if (keepAliveTimer_) {
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
keepAliveTimer_->async_wait(
[this, self{shared_from_this()}](const auto& err) { handleKeepAliveTimeout(err); });
}
lock.unlock();
}
}
void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec,
const std::vector<uint64_t>& consumerStatsRequests) {
if (ec) {
LOG_DEBUG(cnxString() << " Ignoring timer cancelled event, code[" << ec << "]");
return;
}
startConsumerStatsTimer(consumerStatsRequests);
}
const std::future<void>& ClientConnection::close(Result result, bool switchCluster) {
Lock lock(mutex_);
if (closeFuture_) {
connectPromise_.setFailed(result);
return *closeFuture_;
}
auto promise = std::make_shared<std::promise<void>>();
closeFuture_ = promise->get_future();
// The atomic update on state_ guarantees the previous modification on closeFuture_ is visible once the
// atomic read on state_ returns Disconnected `isClosed()`.
// However, it cannot prevent the race like:
// 1. thread 1: Check `isClosed()`, which returns false.
// 2. thread 2: call `close()`, now, `state_` becomes Disconnected, and `closeFuture_` is set.
// 3. thread 1: post the `async_write` to the `io_context`,
// 4. io thread: call `socket_->close()`
// 5. io thread: execute `async_write` on `socket_`, which has been closed
// However, even the race happens, it's still safe because all the socket operations happen in the same
// io thread, the `async_write` operation will simply fail with an error, no crash will happen.
state_ = Disconnected;
// Move the internal fields to process them after `mutex_` was unlocked
auto consumers = std::move(consumers_);
auto producers = std::move(producers_);
auto pendingRequests = std::move(pendingRequests_);
auto pendingLookupRequests = std::move(pendingLookupRequests_);
auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_);
auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_);
auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_);
numOfPendingLookupRequest_ = 0;
if (keepAliveTimer_) {
cancelTimer(*keepAliveTimer_);
keepAliveTimer_.reset();
}
if (consumerStatsRequestTimer_) {
cancelTimer(*consumerStatsRequestTimer_);
consumerStatsRequestTimer_.reset();
}
cancelTimer(*connectTimer_);
lock.unlock();
int refCount = weak_from_this().use_count();
if (result != ResultAlreadyClosed /* closed by the pool */ && !isResultRetryable(result)) {
LOG_ERROR(cnxString() << "Connection closed with " << result << " (refCnt: " << refCount << ")");
} else {
LOG_INFO(cnxString() << "Connection disconnected (refCnt: " << refCount << ")");
}
// Remove the connection from the pool before completing any promise
pool_.remove(logicalAddress_, physicalAddress_, poolIndex_, this);
// Close the socket after removing itself from the pool so that other requests won't be able to acquire
// this connection after the socket is closed.
executor_->dispatch([this, promise, self{shared_from_this()}] {
// According to asio document, ip::tcp::socket and ssl::stream are unsafe as shared objects, so the
// methods must be called within the same implicit or explicit strand.
// The implementation of `ExecutorService` guarantees the internal `io_context::run()` is only called
// in one thread, so we can safely call the socket methods without posting to a strand instance.
ASIO_ERROR err;
socket_->shutdown(ASIO::socket_base::shutdown_both, err);
socket_->close(err);
if (err) {
LOG_WARN(cnxString() << "Failed to close socket: " << err.message());
}
if (tlsSocket_) {
auto tlsSocket = tlsSocket_;
tlsSocket->async_shutdown([promise, self, tlsSocket](const auto&) { promise->set_value(); });
} else {
promise->set_value();
}
});
auto self = shared_from_this();
for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
auto producer = it->second.lock();
if (producer) {
producer->handleDisconnection(result, self);
}
}
for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
auto consumer = it->second.lock();
if (consumer) {
if (switchCluster) {
consumer->onClusterSwitching();
}
consumer->handleDisconnection(result, self);
}
}
self.reset();
connectPromise_.setFailed(result);
// Fail all pending requests after releasing the lock.
for (auto& kv : pendingRequests) {
kv.second->fail(result);
}
for (auto& kv : pendingLookupRequests) {
kv.second->fail(result);
}
for (auto& kv : pendingConsumerStatsMap) {
LOG_ERROR(cnxString() << " Closing Client Connection, please try again later");
kv.second.setFailed(result);
}
for (auto& kv : pendingGetLastMessageIdRequests) {
kv.second->fail(result);
}
for (auto& kv : pendingGetNamespaceTopicsRequests) {
kv.second->fail(result);
}
for (auto& kv : pendingGetSchemaRequests) {
kv.second->fail(result);
}
return *closeFuture_;
}
bool ClientConnection::isClosed() const { return state_ == Disconnected; }
Future<Result, ClientConnectionWeakPtr> ClientConnection::getConnectFuture() {
return connectPromise_.getFuture();
}
void ClientConnection::registerProducer(int producerId, const ProducerImplPtr& producer) {
Lock lock(mutex_);
producers_.insert(std::make_pair(producerId, producer));
}
void ClientConnection::registerConsumer(int consumerId, const ConsumerImplPtr& consumer) {
Lock lock(mutex_);
consumers_.insert(std::make_pair(consumerId, consumer));
}
void ClientConnection::removeProducer(int producerId) {
Lock lock(mutex_);
producers_.erase(producerId);
}
void ClientConnection::removeConsumer(int consumerId) {
Lock lock(mutex_);
consumers_.erase(consumerId);
}
const std::string& ClientConnection::brokerAddress() const { return physicalAddress_; }
int ClientConnection::getServerProtocolVersion() const { return serverProtocolVersion_; }
int32_t ClientConnection::getMaxMessageSize() { return maxMessageSize_.load(std::memory_order_acquire); }
Commands::ChecksumType ClientConnection::getChecksumType() const {
return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c : Commands::None;
}
Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(uint64_t consumerId,
uint64_t requestId) {
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString() << " Client is not connected to the broker");
auto promise = std::make_shared<GetLastMessageIdResponsePromisePtr::element_type>();
promise->setFailed(ResultNotConnected);
return promise->getFuture();
}
auto request =
insertRequest(pendingGetLastMessageIdRequests_, requestId, [cnxString = cnxString(), requestId]() {
LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId);
});
lock.unlock();
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
mockServer_->sendRequest("GET_LAST_MESSAGE_ID", requestId)) {
return request->getFuture();
}
sendCommand(Commands::newGetLastMessageId(consumerId, requestId));
return request->getFuture();
}
Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) {
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString() << "Client is not connected to the broker");
Promise<Result, NamespaceTopicsPtr> promise;
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
auto request =
insertRequest(pendingGetNamespaceTopicsRequests_, requestId, [cnxString = cnxString(), requestId]() {
LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " << requestId);
});
lock.unlock();
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
mockServer_->sendRequest("GET_TOPICS_OF_NAMESPACE", requestId)) {
return request->getFuture();
}
sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId));
return request->getFuture();
}
Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& topicName,
const std::string& version, uint64_t requestId) {
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString() << "Client is not connected to the broker");
Promise<Result, SchemaInfo> promise;
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
auto request =
insertRequest(pendingGetSchemaRequests_, requestId, [cnxString = cnxString(), requestId]() {
LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId);
});
lock.unlock();
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
mockServer_->sendRequest("GET_SCHEMA", requestId)) {
return request->getFuture();
}
sendCommand(Commands::newGetSchema(topicName, version, requestId));
return request->getFuture();
}
void ClientConnection::checkServerError(ServerError error, const std::string& message) {
pulsar::adaptor::checkServerError(*this, error, message);
}
void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& sendReceipt) {
int producerId = sendReceipt.producer_id();
uint64_t sequenceId = sendReceipt.sequence_id();
const proto::MessageIdData& messageIdData = sendReceipt.message_id();
auto messageId = toMessageId(messageIdData);
LOG_DEBUG(cnxString() << "Got receipt for producer: " << producerId << " -- msg: " << sequenceId
<< "-- message id: " << messageId);
Lock lock(mutex_);
auto it = producers_.find(producerId);
if (it != producers_.end()) {
ProducerImplPtr producer = it->second.lock();
lock.unlock();
if (producer) {
if (!producer->ackReceived(sequenceId, messageId)) {
// If the producer fails to process the ack, we need to close the connection
// to give it a chance to recover from there
close(ResultDisconnected);
}
}
} else {
LOG_ERROR(cnxString() << "Got invalid producer Id in SendReceipt: " //
<< producerId << " -- msg: " << sequenceId);
}
}
void ClientConnection::handleSendError(const proto::CommandSendError& error) {
LOG_WARN(cnxString() << "Received send error from server: " << error.message());
if (ChecksumError == error.error()) {
long producerId = error.producer_id();
long sequenceId = error.sequence_id();
Lock lock(mutex_);
auto it = producers_.find(producerId);
if (it != producers_.end()) {
ProducerImplPtr producer = it->second.lock();
lock.unlock();
if (producer) {
if (!producer->removeCorruptMessage(sequenceId)) {
// If the producer fails to remove corrupt msg, we need to close the
// connection to give it a chance to recover from there
close(ResultDisconnected);
}
}
}
} else {
close(ResultDisconnected);
}
}
void ClientConnection::handleSuccess(const proto::CommandSuccess& success) {
LOG_DEBUG(cnxString() << "Received success response from server. req_id: " << success.request_id());
Lock lock(mutex_);
auto it = pendingRequests_.find(success.request_id());
if (it != pendingRequests_.end()) {
auto request = std::move(it->second);
pendingRequests_.erase(it);
lock.unlock();
request->complete({});
}
}
void ClientConnection::handlePartitionedMetadataResponse(
const proto::CommandPartitionedTopicMetadataResponse& partitionMetadataResponse) {
LOG_DEBUG(cnxString() << "Received partition-metadata response from server. req_id: "
<< partitionMetadataResponse.request_id());
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(partitionMetadataResponse.request_id());
if (it != pendingLookupRequests_.end()) {
auto request = std::move(it->second);
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock();
if (!partitionMetadataResponse.has_response() ||
(partitionMetadataResponse.response() ==
proto::CommandPartitionedTopicMetadataResponse::Failed)) {
if (partitionMetadataResponse.has_error()) {
LOG_ERROR(cnxString() << "Failed partition-metadata lookup req_id: "
<< partitionMetadataResponse.request_id()
<< " error: " << partitionMetadataResponse.error()
<< " msg: " << partitionMetadataResponse.message());
checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message());
request->fail(
getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message()));
} else {
LOG_ERROR(cnxString() << "Failed partition-metadata lookup req_id: "
<< partitionMetadataResponse.request_id() << " with empty response: ");
request->fail(ResultConnectError);
}
} else {
LookupDataResultPtr lookupResultPtr = std::make_shared<LookupDataResult>();
lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
request->complete(lookupResultPtr);
}
} else {
LOG_WARN("Received unknown request id from server: " << partitionMetadataResponse.request_id());
}
}
void ClientConnection::handleConsumerStatsResponse(
const proto::CommandConsumerStatsResponse& consumerStatsResponse) {
LOG_DEBUG(cnxString() << "ConsumerStatsResponse command - Received consumer stats "
"response from server. req_id: "
<< consumerStatsResponse.request_id());
Lock lock(mutex_);
auto it = pendingConsumerStatsMap_.find(consumerStatsResponse.request_id());
if (it != pendingConsumerStatsMap_.end()) {
auto request = std::move(it->second);
pendingConsumerStatsMap_.erase(it);
lock.unlock();
if (consumerStatsResponse.has_error_code()) {
if (consumerStatsResponse.has_error_message()) {
LOG_ERROR(cnxString()
<< " Failed to get consumer stats - " << consumerStatsResponse.error_message());
}
request.setFailed(
getResult(consumerStatsResponse.error_code(), consumerStatsResponse.error_message()));
} else {
LOG_DEBUG(cnxString() << "ConsumerStatsResponse command - Received consumer stats "
"response from server. req_id: "
<< consumerStatsResponse.request_id() << " Stats: ");
BrokerConsumerStatsImpl brokerStats(
consumerStatsResponse.msgrateout(), consumerStatsResponse.msgthroughputout(),
consumerStatsResponse.msgrateredeliver(), consumerStatsResponse.consumername(),
consumerStatsResponse.availablepermits(), consumerStatsResponse.unackedmessages(),
consumerStatsResponse.blockedconsumeronunackedmsgs(), consumerStatsResponse.address(),
consumerStatsResponse.connectedsince(), consumerStatsResponse.type(),
consumerStatsResponse.msgrateexpired(), consumerStatsResponse.msgbacklog());
request.setValue(brokerStats);
}
} else {
LOG_WARN("ConsumerStatsResponse command - Received unknown request id from server: "
<< consumerStatsResponse.request_id());
}
}
void ClientConnection::handleLookupTopicRespose(
const proto::CommandLookupTopicResponse& lookupTopicResponse) {
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
auto request = std::move(it->second);
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock();
if (!lookupTopicResponse.has_response() ||
(lookupTopicResponse.response() == proto::CommandLookupTopicResponse::Failed)) {
if (lookupTopicResponse.has_error()) {
LOG_ERROR(cnxString() << "Failed lookup req_id: " << lookupTopicResponse.request_id()
<< " error: " << lookupTopicResponse.error()
<< " msg: " << lookupTopicResponse.message());
checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message());
request->fail(getResult(lookupTopicResponse.error(), lookupTopicResponse.message()));
} else {
LOG_ERROR(cnxString() << "Failed lookup req_id: " << lookupTopicResponse.request_id()
<< " with empty response: ");
request->fail(ResultConnectError);
}
} else {
LOG_DEBUG(cnxString() << "Received lookup response from server. req_id: "
<< lookupTopicResponse.request_id() //
<< " -- broker-url: " << lookupTopicResponse.brokerserviceurl()
<< " -- broker-tls-url: " //
<< lookupTopicResponse.brokerserviceurltls()
<< " authoritative: " << lookupTopicResponse.authoritative() //
<< " redirect: " << lookupTopicResponse.response());
LookupDataResultPtr lookupResultPtr = std::make_shared<LookupDataResult>();
if (tlsSocket_) {
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurltls());
} else {
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl());
}
lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls());
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
proto::CommandLookupTopicResponse::Redirect);
lookupResultPtr->setShouldProxyThroughServiceUrl(lookupTopicResponse.proxy_through_service_url());
request->complete(lookupResultPtr);
}
} else {
LOG_WARN("Received unknown request id from server: " << lookupTopicResponse.request_id());
}
}
void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess& producerSuccess) {
LOG_DEBUG(cnxString() << "Received success producer response from server. req_id: "
<< producerSuccess.request_id() //
<< " -- producer name: " << producerSuccess.producer_name());
Lock lock(mutex_);
auto it = pendingRequests_.find(producerSuccess.request_id());
if (it != pendingRequests_.end()) {
auto request = it->second;
if (!producerSuccess.producer_ready()) {
LOG_INFO(cnxString() << " Producer " << producerSuccess.producer_name()
<< " has been queued up at broker. req_id: "
<< producerSuccess.request_id());
request->disableTimeout();
lock.unlock();
} else {
pendingRequests_.erase(it);
lock.unlock();
ResponseData data;
data.producerName = producerSuccess.producer_name();
data.lastSequenceId = producerSuccess.last_sequence_id();
if (producerSuccess.has_schema_version()) {
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
data.topicEpoch = std::make_optional(producerSuccess.topic_epoch());
} else {
data.topicEpoch = std::nullopt;
}
request->complete(data);
}
}
}
void ClientConnection::handleError(const proto::CommandError& error) {
Result result = getResult(error.error(), error.message());
LOG_WARN(cnxString() << "Received error response from server: " << result
<< (error.has_message() ? (" (" + error.message() + ")") : "")
<< " -- req_id: " << error.request_id());
Lock lock(mutex_);
auto it = pendingRequests_.find(error.request_id());
if (it != pendingRequests_.end()) {
auto request = std::move(it->second);
pendingRequests_.erase(it);
lock.unlock();
request->fail(result);
} else {
auto it = pendingGetLastMessageIdRequests_.find(error.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
auto request = std::move(it->second);
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
request->fail(result);
} else {
auto it = pendingGetNamespaceTopicsRequests_.find(error.request_id());
if (it != pendingGetNamespaceTopicsRequests_.end()) {
auto request = std::move(it->second);
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();
request->fail(result);
} else {
lock.unlock();
}
}
}
}
std::string ClientConnection::getMigratedBrokerServiceUrl(
const proto::CommandTopicMigrated& commandTopicMigrated) {
if (tlsSocket_) {
if (commandTopicMigrated.has_brokerserviceurltls()) {
return commandTopicMigrated.brokerserviceurltls();
}
} else if (commandTopicMigrated.has_brokerserviceurl()) {
return commandTopicMigrated.brokerserviceurl();
}
return "";
}
void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& commandTopicMigrated) {
const long resourceId = commandTopicMigrated.resource_id();
const std::string migratedBrokerServiceUrl = getMigratedBrokerServiceUrl(commandTopicMigrated);
if (migratedBrokerServiceUrl.empty()) {
LOG_WARN("Failed to find the migrated broker url for resource:"
<< resourceId
<< (commandTopicMigrated.has_brokerserviceurl()
? ", migratedBrokerUrl: " + commandTopicMigrated.brokerserviceurl()
: "")
<< (commandTopicMigrated.has_brokerserviceurltls()
? ", migratedBrokerUrlTls: " + commandTopicMigrated.brokerserviceurltls()
: ""));
return;
}
Lock lock(mutex_);
if (commandTopicMigrated.resource_type() == proto::CommandTopicMigrated_ResourceType_Producer) {
auto it = producers_.find(resourceId);
if (it != producers_.end()) {
auto producer = it->second.lock();
producer->setRedirectedClusterURI(migratedBrokerServiceUrl);
unsafeRemovePendingRequest(producer->firstRequestIdAfterConnect());
LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
} else {
LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId);
}
} else {
auto it = consumers_.find(resourceId);
if (it != consumers_.end()) {
auto consumer = it->second.lock();
consumer->setRedirectedClusterURI(migratedBrokerServiceUrl);
unsafeRemovePendingRequest(consumer->firstRequestIdAfterConnect());
LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
} else {
LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId);
}
}
}
optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseProducer& closeProducer) {
if (tlsSocket_) {
if (closeProducer.has_assignedbrokerserviceurltls()) {
return closeProducer.assignedbrokerserviceurltls();
}
} else if (closeProducer.has_assignedbrokerserviceurl()) {
return closeProducer.assignedbrokerserviceurl();
}
return {};
}
optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseConsumer& closeConsumer) {
if (tlsSocket_) {
if (closeConsumer.has_assignedbrokerserviceurltls()) {
return closeConsumer.assignedbrokerserviceurltls();
}
} else if (closeConsumer.has_assignedbrokerserviceurl()) {
return closeConsumer.assignedbrokerserviceurl();
}
return {};
}
void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& closeProducer) {
int producerId = closeProducer.producer_id();
LOG_DEBUG("Broker notification of Closed producer: " << producerId);
Lock lock(mutex_);
auto it = producers_.find(producerId);
if (it != producers_.end()) {
ProducerImplPtr producer = it->second.lock();
producers_.erase(it);
lock.unlock();
if (producer) {
auto assignedBrokerServiceUrl = getAssignedBrokerServiceUrl(closeProducer);
producer->disconnectProducer(assignedBrokerServiceUrl);
}
} else {
LOG_ERROR(cnxString() << "Got invalid producer Id in closeProducer command: " << producerId);
}
}
void ClientConnection::handleCloseConsumer(const proto::CommandCloseConsumer& closeconsumer) {
int consumerId = closeconsumer.consumer_id();
LOG_DEBUG("Broker notification of Closed consumer: " << consumerId);
Lock lock(mutex_);
auto it = consumers_.find(consumerId);
if (it != consumers_.end()) {
ConsumerImplPtr consumer = it->second.lock();
consumers_.erase(it);
lock.unlock();
if (consumer) {
auto assignedBrokerServiceUrl = getAssignedBrokerServiceUrl(closeconsumer);
consumer->disconnectConsumer(assignedBrokerServiceUrl);
}
} else {
LOG_ERROR(cnxString() << "Got invalid consumer Id in closeConsumer command: " << consumerId);
}
}
void ClientConnection::handleAuthChallenge() {
LOG_DEBUG(cnxString() << "Received auth challenge from broker");
Result result;
SharedBuffer buffer = Commands::newAuthResponse(authentication_, result);
if (result != ResultOk) {
LOG_ERROR(cnxString() << "Failed to send auth response: " << result);
close(result);
return;
}
auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) {
handleSentAuthResponse(err, buffer);
}));
}
void ClientConnection::handleGetLastMessageIdResponse(
const proto::CommandGetLastMessageIdResponse& getLastMessageIdResponse) {
LOG_DEBUG(cnxString() << "Received getLastMessageIdResponse from server. req_id: "
<< getLastMessageIdResponse.request_id());
Lock lock(mutex_);
auto it = pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
auto request = std::move(it->second);
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
if (getLastMessageIdResponse.has_consumer_mark_delete_position()) {
request->complete({toMessageId(getLastMessageIdResponse.last_message_id()),
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
} else {
request->complete({toMessageId(getLastMessageIdResponse.last_message_id())});
}
} else {
lock.unlock();
LOG_WARN("getLastMessageIdResponse command - Received unknown request id from server: "
<< getLastMessageIdResponse.request_id());
}
}
void ClientConnection::handleGetTopicOfNamespaceResponse(
const proto::CommandGetTopicsOfNamespaceResponse& response) {
LOG_DEBUG(cnxString() << "Received GetTopicsOfNamespaceResponse from server. req_id: "
<< response.request_id() << " topicsSize" << response.topics_size());
Lock lock(mutex_);
auto it = pendingGetNamespaceTopicsRequests_.find(response.request_id());
if (it != pendingGetNamespaceTopicsRequests_.end()) {
auto request = std::move(it->second);
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();
int numTopics = response.topics_size();
std::set<std::string> topicSet;
// get all topics
for (int i = 0; i < numTopics; i++) {
// remove partition part
const std::string& topicName = response.topics(i);
int pos = topicName.find("-partition-");
std::string filteredName = topicName.substr(0, pos);
// filter duped topic name
if (topicSet.find(filteredName) == topicSet.end()) {
topicSet.insert(filteredName);
}
}
NamespaceTopicsPtr topicsPtr =
std::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end());
request->complete(topicsPtr);
} else {
lock.unlock();
LOG_WARN(
"GetTopicsOfNamespaceResponse command - Received unknown request id from "
"server: "
<< response.request_id());
}
}
void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResponse& response) {
LOG_DEBUG(cnxString() << "Received GetSchemaResponse from server. req_id: " << response.request_id());
Lock lock(mutex_);
auto it = pendingGetSchemaRequests_.find(response.request_id());
if (it != pendingGetSchemaRequests_.end()) {
auto request = std::move(it->second);
pendingGetSchemaRequests_.erase(it);
lock.unlock();
if (response.has_error_code()) {
Result result = getResult(response.error_code(), response.error_message());
if (response.error_code() != proto::TopicNotFound) {
LOG_WARN(cnxString() << "Received error GetSchemaResponse from server " << result
<< (response.has_error_message()
? (" (" + response.error_message() + ")")
: "")
<< " -- req_id: " << response.request_id());
}
request->fail(result);
return;
}
const auto& schema = response.schema();
const auto& properMap = schema.properties();
StringMap properties;
for (auto kv = properMap.begin(); kv != properMap.end(); ++kv) {
properties[kv->key()] = kv->value();
}
SchemaInfo schemaInfo(static_cast<SchemaType>(schema.type()), "", schema.schema_data(), properties);
request->complete(schemaInfo);
} else {
lock.unlock();
LOG_WARN(
"GetSchemaResponse command - Received unknown request id from "
"server: "
<< response.request_id());
}
}
void ClientConnection::handleAckResponse(const proto::CommandAckResponse& response) {
LOG_DEBUG(cnxString() << "Received AckResponse from server. req_id: " << response.request_id());
Lock lock(mutex_);
auto it = pendingRequests_.find(response.request_id());
if (it == pendingRequests_.cend()) {
lock.unlock();
LOG_WARN("Cannot find the cached AckResponse whose req_id is " << response.request_id());
return;
}
auto request = std::move(it->second);
pendingRequests_.erase(it);
lock.unlock();
if (response.has_error()) {
request->fail(getResult(response.error(), ""));
} else {
request->complete({});
}
}
void ClientConnection::unsafeRemovePendingRequest(long requestId) {
auto it = pendingRequests_.find(requestId);
if (it != pendingRequests_.end()) {
auto request = std::move(it->second);
pendingRequests_.erase(it);
request->fail(ResultDisconnected);
}
}
} // namespace pulsar