blob: 04202d3151906fd3836bed1ee25ff2747e4772e5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "ClientConnection.h"
#include <pulsar/MessageIdBuilder.h>
#include <boost/optional.hpp>
#include <fstream>
#include "AsioDefines.h"
#include "ClientConnectionAdaptor.h"
#include "ClientImpl.h"
#include "Commands.h"
#include "ConnectionPool.h"
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
#include "OpSendMsg.h"
#include "ProducerImpl.h"
#include "PulsarApi.pb.h"
#include "ResultUtils.h"
#include "Url.h"
#include "auth/AuthOauth2.h"
#include "auth/InitialAuthData.h"
#include "checksum/ChecksumProvider.h"
DECLARE_LOG_OBJECT()
using namespace ASIO::ip;
namespace pulsar {
using proto::BaseCommand;
static const uint32_t DefaultBufferSize = 64 * 1024;
static const int KeepAliveIntervalInSeconds = 30;
static MessageId toMessageId(const proto::MessageIdData& messageIdData) {
return MessageIdBuilder::from(messageIdData).build();
}
// Convert error codes from protobuf to client API Result
static Result getResult(ServerError serverError, const std::string& message) {
switch (serverError) {
case UnknownError:
return ResultUnknownError;
case MetadataError:
return ResultBrokerMetadataError;
case ChecksumError:
return ResultChecksumError;
case PersistenceError:
return ResultBrokerPersistenceError;
case AuthenticationError:
return ResultAuthenticationError;
case AuthorizationError:
return ResultAuthorizationError;
case ConsumerBusy:
return ResultConsumerBusy;
case ServiceNotReady:
return (message.find("the broker do not have test listener") == std::string::npos)
? ResultRetryable
: ResultConnectError;
case ProducerBlockedQuotaExceededError:
return ResultProducerBlockedQuotaExceededError;
case ProducerBlockedQuotaExceededException:
return ResultProducerBlockedQuotaExceededException;
case TopicNotFound:
return ResultTopicNotFound;
case SubscriptionNotFound:
return ResultSubscriptionNotFound;
case ConsumerNotFound:
return ResultConsumerNotFound;
case UnsupportedVersionError:
return ResultUnsupportedVersionError;
case TooManyRequests:
return ResultTooManyLookupRequestException;
case TopicTerminatedError:
return ResultTopicTerminated;
case ProducerBusy:
return ResultProducerBusy;
case InvalidTopicName:
return ResultInvalidTopicName;
case IncompatibleSchema:
return ResultIncompatibleSchema;
case ConsumerAssignError:
return ResultConsumerAssignError;
case TransactionCoordinatorNotFound:
return ResultTransactionCoordinatorNotFoundError;
case InvalidTxnStatus:
return ResultInvalidTxnStatusError;
case NotAllowedError:
return ResultNotAllowedError;
case TransactionConflict:
return ResultTransactionConflict;
case TransactionNotFound:
return ResultTransactionNotFound;
case ProducerFenced:
return ResultProducerFenced;
}
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
// return here to make the compiler happy.
return ResultUnknownError;
}
inline std::ostream& operator<<(std::ostream& os, proto::ServerError error) {
os << getResult(error, "");
return os;
}
static bool file_exists(const std::string& path) {
if (path.empty()) {
return false;
}
std::ifstream f(path);
return f.good();
}
std::atomic<int32_t> ClientConnection::maxMessageSize_{Commands::DefaultMaxMessageSize};
ClientConnection::ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
ExecutorServicePtr executor,
const ClientConfiguration& clientConfiguration,
const AuthenticationPtr& authentication, const std::string& clientVersion,
ConnectionPool& pool, size_t poolIndex)
: operationsTimeout_(ClientImpl::getOperationTimeout(clientConfiguration)),
authentication_(authentication),
serverProtocolVersion_(proto::ProtocolVersion_MIN),
executor_(executor),
resolver_(executor_->createTcpResolver()),
socket_(executor_->createSocket()),
strand_(ASIO::make_strand(executor_->getIOService().get_executor())),
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
cnxString_("[<none> -> " + physicalAddress + "] "),
incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
connectTimeoutTask_(
std::make_shared<PeriodicTask>(*executor_, clientConfiguration.getConnectionTimeout())),
outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
clientVersion_(clientVersion),
pool_(pool),
poolIndex_(poolIndex) {
LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout());
if (!authentication_) {
LOG_ERROR("Invalid authentication plugin");
throw ResultAuthenticationError;
return;
}
auto oauth2Auth = std::dynamic_pointer_cast<AuthOauth2>(authentication_);
if (oauth2Auth) {
// Configure the TLS trust certs file for Oauth2
auto authData = std::dynamic_pointer_cast<AuthenticationDataProvider>(
std::make_shared<InitialAuthData>(clientConfiguration.getTlsTrustCertsFilePath()));
oauth2Auth->getAuthData(authData);
}
if (clientConfiguration.isUseTls()) {
ASIO::ssl::context ctx(ASIO::ssl::context::tlsv12_client);
Url serviceUrl;
Url proxyUrl;
Url::parse(physicalAddress, serviceUrl);
proxyServiceUrl_ = clientConfiguration.getProxyServiceUrl();
proxyProtocol_ = clientConfiguration.getProxyProtocol();
if (proxyProtocol_ == ClientConfiguration::SNI && !proxyServiceUrl_.empty()) {
Url::parse(proxyServiceUrl_, proxyUrl);
isSniProxy_ = true;
LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_);
}
if (clientConfiguration.isTlsAllowInsecureConnection()) {
ctx.set_verify_mode(ASIO::ssl::context::verify_none);
isTlsAllowInsecureConnection_ = true;
} else {
ctx.set_verify_mode(ASIO::ssl::context::verify_peer);
std::string trustCertFilePath = clientConfiguration.getTlsTrustCertsFilePath();
if (!trustCertFilePath.empty()) {
if (file_exists(trustCertFilePath)) {
ctx.load_verify_file(trustCertFilePath);
} else {
LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
throw ResultAuthenticationError;
}
} else {
ctx.set_default_verify_paths();
}
}
std::string tlsCertificates = clientConfiguration.getTlsCertificateFilePath();
std::string tlsPrivateKey = clientConfiguration.getTlsPrivateKeyFilePath();
AuthenticationDataPtr authData;
if (authentication_->getAuthData(authData) == ResultOk && authData->hasDataForTls()) {
tlsCertificates = authData->getTlsCertificates();
tlsPrivateKey = authData->getTlsPrivateKey();
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
throw ResultAuthenticationError;
}
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
throw ResultAuthenticationError;
}
ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem);
ctx.use_certificate_file(tlsCertificates, ASIO::ssl::context::pem);
} else {
if (file_exists(tlsPrivateKey) && file_exists(tlsCertificates)) {
ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem);
ctx.use_certificate_file(tlsCertificates, ASIO::ssl::context::pem);
}
}
tlsSocket_ = ExecutorService::createTlsSocket(socket_, ctx);
if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
}
LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(), serviceUrl.host().c_str())) {
ASIO_ERROR ec{static_cast<int>(::ERR_get_error()), ASIO::error::get_ssl_category()};
LOG_ERROR(ec.message() << ": Error while setting TLS SNI");
return;
}
}
}
ClientConnection::~ClientConnection() {
LOG_INFO(cnxString_ << "Destroyed connection to " << logicalAddress_ << "-" << poolIndex_);
}
void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) {
if (!cmdConnected.has_server_version()) {
LOG_ERROR(cnxString_ << "Server version is not set");
close();
return;
}
if (cmdConnected.has_max_message_size()) {
LOG_DEBUG("Connection has max message size setting: " << cmdConnected.max_message_size());
maxMessageSize_.store(cmdConnected.max_message_size(), std::memory_order_release);
LOG_DEBUG("Current max message size is: " << maxMessageSize_);
}
Lock lock(mutex_);
if (isClosed()) {
LOG_INFO(cnxString_ << "Connection already closed");
return;
}
state_ = Ready;
connectTimeoutTask_->stop();
serverProtocolVersion_ = cmdConnected.protocol_version();
if (serverProtocolVersion_ >= proto::v1) {
// Only send keep-alive probes if the broker supports it
keepAliveTimer_ = executor_->createDeadlineTimer();
if (keepAliveTimer_) {
keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds));
auto weakSelf = weak_from_this();
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
auto self = weakSelf.lock();
if (self) {
self->handleKeepAliveTimeout();
}
});
}
}
lock.unlock();
connectPromise_.setValue(shared_from_this());
if (serverProtocolVersion_ >= proto::v8) {
startConsumerStatsTimer(std::vector<uint64_t>());
}
}
void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests) {
std::vector<Promise<Result, BrokerConsumerStatsImpl>> consumerStatsPromises;
Lock lock(mutex_);
for (int i = 0; i < consumerStatsRequests.size(); i++) {
PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find(consumerStatsRequests[i]);
if (it != pendingConsumerStatsMap_.end()) {
LOG_DEBUG(cnxString_ << " removing request_id " << it->first
<< " from the pendingConsumerStatsMap_");
consumerStatsPromises.push_back(it->second);
pendingConsumerStatsMap_.erase(it);
} else {
LOG_DEBUG(cnxString_ << "request_id " << it->first << " already fulfilled - not removing it");
}
}
consumerStatsRequests.clear();
for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.begin();
it != pendingConsumerStatsMap_.end(); ++it) {
consumerStatsRequests.push_back(it->first);
}
// If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
// Check if we have a timer still before we set the request timer to pop again.
if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
auto weakSelf = weak_from_this();
consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleConsumerStatsTimeout(err, consumerStatsRequests);
}
});
}
lock.unlock();
// Complex logic since promises need to be fulfilled outside the lock
for (int i = 0; i < consumerStatsPromises.size(); i++) {
consumerStatsPromises[i].setFailed(ResultTimeout);
LOG_WARN(cnxString_ << " Operation timedout, didn't get response from broker");
}
}
/// The number of unacknowledged probes to send before considering the connection dead and notifying the
/// application layer
typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPCNT> tcp_keep_alive_count;
/// The interval between subsequential keepalive probes, regardless of what the connection has exchanged in
/// the meantime
typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPINTVL> tcp_keep_alive_interval;
/// The interval between the last data packet sent (simple ACKs are not considered data) and the first
/// keepalive
/// probe; after the connection is marked to need keepalive, this counter is not used any further
#ifdef __APPLE__
typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> tcp_keep_alive_idle;
#else
typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep_alive_idle;
#endif
/*
* TCP Connect handler
*
* if async_connect without any error, connected_ would be set to true
* at this point the connection is deemed valid to be used by clients of this class
*/
void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
if (!err) {
std::stringstream cnxStringStream;
try {
cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
<< "] ";
cnxString_ = cnxStringStream.str();
} catch (const ASIO_SYSTEM_ERROR& e) {
LOG_ERROR("Failed to get endpoint: " << e.what());
close(ResultRetryable);
return;
}
if (logicalAddress_ == physicalAddress_) {
LOG_INFO(cnxString_ << "Connected to broker");
} else {
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
<< ", proxy: " << proxyServiceUrl_
<< ", physical address:" << physicalAddress_);
}
Lock lock(mutex_);
if (isClosed()) {
LOG_INFO(cnxString_ << "Connection already closed");
return;
}
state_ = TcpConnected;
lock.unlock();
ASIO_ERROR error;
socket_->set_option(tcp::no_delay(true), error);
if (error) {
LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
}
socket_->set_option(tcp::socket::keep_alive(true), error);
if (error) {
LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
}
// Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
// should never happen, given that we're sending our own keep-alive probes (within the TCP
// connection) every 30 seconds
socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
if (error) {
LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
}
// Send up to 10 probes before declaring the connection broken
socket_->set_option(tcp_keep_alive_count(10), error);
if (error) {
LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
}
// Interval between probes: 6 seconds
socket_->set_option(tcp_keep_alive_interval(6), error);
if (error) {
LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
}
if (tlsSocket_) {
if (!isTlsAllowInsecureConnection_) {
ASIO_ERROR err;
Url service_url;
if (!Url::parse(physicalAddress_, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
}
}
auto weakSelf = weak_from_this();
auto socket = socket_;
auto tlsSocket = tlsSocket_;
// socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
// fault might happen
auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleHandshake(err);
}
};
tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
ASIO::bind_executor(strand_, callback));
} else {
handleHandshake(ASIO_SUCCESS);
}
} else if (endpointIterator != tcp::resolver::iterator()) {
LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
// The connection failed. Try the next endpoint in the list.
ASIO_ERROR closeError;
socket_->close(closeError); // ignore the error of close
if (closeError) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
connectTimeoutTask_->stop();
++endpointIterator;
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
auto weakSelf = weak_from_this();
socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleTcpConnected(err, endpointIterator);
}
});
} else {
if (err == ASIO::error::operation_aborted) {
// TCP connect timeout, which is not retryable
close();
} else {
close(ResultRetryable);
}
}
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close(ResultRetryable);
}
}
void ClientConnection::handleHandshake(const ASIO_ERROR& err) {
if (err) {
if (err.value() == ASIO::ssl::error::stream_truncated) {
LOG_WARN(cnxString_ << "Handshake failed: " << err.message());
close(ResultRetryable);
} else {
LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
close();
}
return;
}
bool connectingThroughProxy = logicalAddress_ != physicalAddress_;
Result result = ResultOk;
SharedBuffer buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy,
clientVersion_, result);
if (result != ResultOk) {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << result);
close(result);
return;
}
// Send CONNECT command to broker
auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) {
handleSentPulsarConnect(err, buffer);
}));
}
void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const SharedBuffer& buffer) {
if (isClosed()) {
return;
}
if (err) {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close();
return;
}
// Schedule the reading of CONNECTED command from broker
readNextCommand();
}
void ClientConnection::handleSentAuthResponse(const ASIO_ERROR& err, const SharedBuffer& buffer) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString_ << "Failed to send auth response: " << err.message());
close();
return;
}
}
/*
* Async method to establish TCP connection with broker
*
* tcpConnectCompletionHandler is notified when the result of this call is available.
*
*/
void ClientConnection::tcpConnectAsync() {
if (isClosed()) {
return;
}
ASIO_ERROR err;
Url service_url;
std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
if (!Url::parse(hostUrl, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
}
if (service_url.protocol() != "pulsar" && service_url.protocol() != "pulsar+ssl") {
LOG_ERROR(cnxString_ << "Invalid Url protocol '" << service_url.protocol()
<< "'. Valid values are 'pulsar' and 'pulsar+ssl'");
close();
return;
}
LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
auto weakSelf = weak_from_this();
resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) {
auto self = weakSelf.lock();
if (self) {
self->handleResolve(err, iterator);
}
});
}
void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
if (err) {
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
close();
return;
}
auto weakSelf = weak_from_this();
connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
ClientConnectionPtr ptr = weakSelf.lock();
if (!ptr) {
// Connection was already destroyed
return;
}
if (ptr->state_ != Ready) {
LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
<< ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
PeriodicTask::ErrorCode err;
ptr->socket_->close(err);
if (err) {
LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
}
}
ptr->connectTimeoutTask_->stop();
});
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
<< " to " << endpointIterator->endpoint());
socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleTcpConnected(err, endpointIterator);
}
});
} else {
LOG_WARN(cnxString_ << "No IP address found");
close();
return;
}
}
void ClientConnection::readNextCommand() {
const static uint32_t minReadSize = sizeof(uint32_t);
auto self = shared_from_this();
asyncReceive(incomingBuffer_.asio_buffer(),
customAllocReadHandler([this, self](const ASIO_ERROR& err, size_t bytesTransferred) {
handleRead(err, bytesTransferred, minReadSize);
}));
}
void ClientConnection::handleRead(const ASIO_ERROR& err, size_t bytesTransferred, uint32_t minReadSize) {
if (isClosed()) {
return;
}
// Update buffer write idx with new data
incomingBuffer_.bytesWritten(bytesTransferred);
if (err || bytesTransferred == 0) {
if (err == ASIO::error::operation_aborted) {
LOG_DEBUG(cnxString_ << "Read operation was canceled: " << err.message());
} else if (bytesTransferred == 0 || err == ASIO::error::eof) {
LOG_DEBUG(cnxString_ << "Server closed the connection: " << err.message());
} else {
LOG_ERROR(cnxString_ << "Read operation failed: " << err.message());
}
close(ResultDisconnected);
} else if (bytesTransferred < minReadSize) {
// Read the remaining part, use a slice of buffer to write on the next
// region
SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred);
auto self = shared_from_this();
auto nextMinReadSize = minReadSize - bytesTransferred;
asyncReceive(buffer.asio_buffer(),
customAllocReadHandler(
[this, self, nextMinReadSize](const ASIO_ERROR& err, size_t bytesTransferred) {
handleRead(err, bytesTransferred, nextMinReadSize);
}));
} else {
processIncomingBuffer();
}
}
void ClientConnection::processIncomingBuffer() {
// Process all the available frames from the incoming buffer
while (incomingBuffer_.readableBytes() >= sizeof(uint32_t)) {
// Extract message frames from incoming buffer
// At this point we have at least 4 bytes in the buffer
uint32_t frameSize = incomingBuffer_.readUnsignedInt();
if (frameSize > incomingBuffer_.readableBytes()) {
// We don't have the entire frame yet
const uint32_t bytesToReceive = frameSize - incomingBuffer_.readableBytes();
// Rollback the reading of frameSize (when the frame will be complete,
// we'll read it again
incomingBuffer_.rollback(sizeof(uint32_t));
if (bytesToReceive > incomingBuffer_.writableBytes()) {
// Need to allocate a buffer big enough for the frame
uint32_t newBufferSize = std::max<uint32_t>(DefaultBufferSize, frameSize + sizeof(uint32_t));
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, newBufferSize);
}
auto self = shared_from_this();
asyncReceive(incomingBuffer_.asio_buffer(),
customAllocReadHandler(
[this, self, bytesToReceive](const ASIO_ERROR& err, size_t bytesTransferred) {
handleRead(err, bytesTransferred, bytesToReceive);
}));
return;
}
// At this point, we have at least one complete frame available in the buffer
uint32_t cmdSize = incomingBuffer_.readUnsignedInt();
proto::BaseCommand incomingCmd;
if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
close(ResultDisconnected);
return;
}
incomingBuffer_.consume(cmdSize);
if (incomingCmd.type() == BaseCommand::MESSAGE) {
// Parse message metadata and extract payload
proto::MessageMetadata msgMetadata;
proto::BrokerEntryMetadata brokerEntryMetadata;
// read checksum
uint32_t remainingBytes = frameSize - (cmdSize + 4);
auto readerIndex = incomingBuffer_.readerIndex();
if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) {
// broker entry metadata is present
uint32_t brokerEntryMetadataSize = incomingBuffer_.readUnsignedInt();
if (!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), brokerEntryMetadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id()
<< ", message ledger id "
<< incomingCmd.message().message_id().ledgerid() << ", entry id "
<< incomingCmd.message().message_id().entryid()
<< "] Error parsing broker entry metadata");
close(ResultDisconnected);
return;
}
incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize);
remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
} else {
incomingBuffer_.setReaderIndex(readerIndex);
}
bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);
uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() //
<< ", message ledger id "
<< incomingCmd.message().message_id().ledgerid() //
<< ", entry id " << incomingCmd.message().message_id().entryid()
<< "] Error parsing message metadata");
close(ResultDisconnected);
return;
}
incomingBuffer_.consume(metadataSize);
remainingBytes -= (4 + metadataSize);
uint32_t payloadSize = remainingBytes;
SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
incomingBuffer_.consume(payloadSize);
handleIncomingMessage(incomingCmd.message(), isChecksumValid, brokerEntryMetadata, msgMetadata,
payload);
} else {
handleIncomingCommand(incomingCmd);
}
}
if (incomingBuffer_.readableBytes() > 0) {
// We still have 1 to 3 bytes from the next frame
assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));
// Restart with a new buffer and copy the few bytes at the beginning
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize);
// At least we need to read 4 bytes to have the complete frame size
uint32_t minReadSize = sizeof(uint32_t) - incomingBuffer_.readableBytes();
auto self = shared_from_this();
asyncReceive(
incomingBuffer_.asio_buffer(),
customAllocReadHandler([this, self, minReadSize](const ASIO_ERROR& err, size_t bytesTransferred) {
handleRead(err, bytesTransferred, minReadSize);
}));
return;
}
// We have read everything we had in the buffer
// Rollback the indexes to reuse the same buffer
incomingBuffer_.reset();
readNextCommand();
}
bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& remainingBytes,
proto::BaseCommand& incomingCmd) {
int readerIndex = incomingBuffer_.readerIndex();
bool isChecksumValid = true;
if (incomingBuffer_.readUnsignedShort() == Commands::magicCrc32c) {
uint32_t storedChecksum = incomingBuffer_.readUnsignedInt();
remainingBytes -= (2 + 4) /* subtract size of checksum itself */;
// compute metadata-payload checksum
int metadataPayloadSize = remainingBytes;
uint32_t computedChecksum = computeChecksum(0, incomingBuffer_.data(), metadataPayloadSize);
// verify checksum
isChecksumValid = (storedChecksum == computedChecksum);
if (!isChecksumValid) {
LOG_ERROR("[consumer id "
<< incomingCmd.message().consumer_id() //
<< ", message ledger id " << incomingCmd.message().message_id().ledgerid() //
<< ", entry id " << incomingCmd.message().message_id().entryid() //
<< "stored-checksum" << storedChecksum << "computedChecksum" << computedChecksum //
<< "] Checksum verification failed");
}
} else {
incomingBuffer_.setReaderIndex(readerIndex);
}
return isChecksumValid;
}
void ClientConnection::handleActiveConsumerChange(const proto::CommandActiveConsumerChange& change) {
LOG_DEBUG(cnxString_ << "Received notification about active consumer change, consumer_id: "
<< change.consumer_id() << " isActive: " << change.is_active());
Lock lock(mutex_);
ConsumersMap::iterator it = consumers_.find(change.consumer_id());
if (it != consumers_.end()) {
ConsumerImplPtr consumer = it->second.lock();
if (consumer) {
lock.unlock();
consumer->activeConsumerChanged(change.is_active());
} else {
consumers_.erase(change.consumer_id());
LOG_DEBUG(cnxString_ << "Ignoring incoming message for already destroyed consumer "
<< change.consumer_id());
}
} else {
LOG_DEBUG(cnxString_ << "Got invalid consumer Id in " << change.consumer_id()
<< " -- isActive: " << change.is_active());
}
}
void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
proto::BrokerEntryMetadata& brokerEntryMetadata,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload) {
LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: " << msg.consumer_id());
Lock lock(mutex_);
ConsumersMap::iterator it = consumers_.find(msg.consumer_id());
if (it != consumers_.end()) {
ConsumerImplPtr consumer = it->second.lock();
if (consumer) {
// Unlock the mutex before notifying the consumer of the
// new received message
lock.unlock();
consumer->messageReceived(shared_from_this(), msg, isChecksumValid, brokerEntryMetadata,
msgMetadata, payload);
} else {
consumers_.erase(msg.consumer_id());
LOG_DEBUG(cnxString_ << "Ignoring incoming message for already destroyed consumer "
<< msg.consumer_id());
}
} else {
LOG_DEBUG(cnxString_ << "Got invalid consumer Id in " //
<< msg.consumer_id() << " -- msg: " << msgMetadata.sequence_id());
}
}
void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
LOG_DEBUG(cnxString_ << "Handling incoming command: " << Commands::messageType(incomingCmd.type()));
switch (state_.load()) {
case Pending: {
LOG_ERROR(cnxString_ << "Connection is not ready yet");
break;
}
case TcpConnected: {
// Handle Pulsar Connected
if (incomingCmd.type() != BaseCommand::CONNECTED) {
// Wrong cmd
close();
} else {
handlePulsarConnected(incomingCmd.connected());
}
break;
}
case Disconnected: {
LOG_ERROR(cnxString_ << "Connection already disconnected");
break;
}
case Ready: {
// Since we are receiving data from the connection, we are assuming that for now the
// connection is still working well.
havePendingPingRequest_ = false;
// Handle normal commands
switch (incomingCmd.type()) {
case BaseCommand::SEND_RECEIPT:
handleSendReceipt(incomingCmd.send_receipt());
break;
case BaseCommand::SEND_ERROR:
handleSendError(incomingCmd.send_error());
break;
case BaseCommand::SUCCESS:
handleSuccess(incomingCmd.success());
break;
case BaseCommand::PARTITIONED_METADATA_RESPONSE:
handlePartitionedMetadataResponse(incomingCmd.partitionmetadataresponse());
break;
case BaseCommand::CONSUMER_STATS_RESPONSE:
handleConsumerStatsResponse(incomingCmd.consumerstatsresponse());
break;
case BaseCommand::LOOKUP_RESPONSE:
handleLookupTopicRespose(incomingCmd.lookuptopicresponse());
break;
case BaseCommand::PRODUCER_SUCCESS:
handleProducerSuccess(incomingCmd.producer_success());
break;
case BaseCommand::ERROR:
handleError(incomingCmd.error());
break;
case BaseCommand::TOPIC_MIGRATED:
handleTopicMigrated(incomingCmd.topicmigrated());
break;
case BaseCommand::CLOSE_PRODUCER:
handleCloseProducer(incomingCmd.close_producer());
break;
case BaseCommand::CLOSE_CONSUMER:
handleCloseConsumer(incomingCmd.close_consumer());
break;
case BaseCommand::PING:
// Respond to ping request
LOG_DEBUG(cnxString_ << "Replying to ping command");
sendCommand(Commands::newPong());
break;
case BaseCommand::PONG:
LOG_DEBUG(cnxString_ << "Received response to ping message");
break;
case BaseCommand::AUTH_CHALLENGE:
handleAuthChallenge();
break;
case BaseCommand::ACTIVE_CONSUMER_CHANGE:
handleActiveConsumerChange(incomingCmd.active_consumer_change());
break;
case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
handleGetLastMessageIdResponse(incomingCmd.getlastmessageidresponse());
break;
case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
handleGetTopicOfNamespaceResponse(incomingCmd.gettopicsofnamespaceresponse());
break;
case BaseCommand::GET_SCHEMA_RESPONSE:
handleGetSchemaResponse(incomingCmd.getschemaresponse());
break;
case BaseCommand::ACK_RESPONSE:
handleAckResponse(incomingCmd.ackresponse());
break;
default:
LOG_WARN(cnxString_ << "Received invalid message from server");
close(ResultDisconnected);
break;
}
}
}
}
Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint64_t consumerId,
uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, BrokerConsumerStatsImpl> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
}
pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise));
lock.unlock();
sendCommand(Commands::newConsumerStats(consumerId, requestId));
return promise.getFuture();
}
void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative,
const std::string& listenerName, const uint64_t requestId,
LookupDataResultPromisePtr promise) {
newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise);
}
void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, const uint64_t requestId,
LookupDataResultPromisePtr promise) {
newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, promise);
}
void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t requestId,
LookupDataResultPromisePtr promise) {
Lock lock(mutex_);
std::shared_ptr<LookupDataResultPtr> lookupDataResult;
lookupDataResult = std::make_shared<LookupDataResultPtr>();
if (isClosed()) {
lock.unlock();
promise->setFailed(ResultNotConnected);
return;
} else if (numOfPendingLookupRequest_ >= maxPendingLookupRequest_) {
lock.unlock();
promise->setFailed(ResultTooManyLookupRequestException);
return;
}
LookupRequestData requestData;
requestData.promise = promise;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (self) {
self->handleLookupTimeout(ec, requestData);
}
});
pendingLookupRequests_.insert(std::make_pair(requestId, requestData));
numOfPendingLookupRequest_++;
lock.unlock();
sendCommand(cmd);
}
void ClientConnection::sendCommand(const SharedBuffer& cmd) {
Lock lock(mutex_);
if (pendingWriteOperations_++ == 0) {
// Write immediately to socket
if (tlsSocket_) {
auto weakSelf = weak_from_this();
auto callback = [weakSelf, cmd]() {
auto self = weakSelf.lock();
if (self) {
self->sendCommandInternal(cmd);
}
};
ASIO::post(strand_, callback);
} else {
sendCommandInternal(cmd);
}
} else {
// Queue to send later
pendingWriteBuffers_.push_back(cmd);
}
}
void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
auto self = shared_from_this();
asyncWrite(cmd.const_asio_buffer(),
customAllocWriteHandler([this, self, cmd](const ASIO_ERROR& err, size_t bytesTransferred) {
handleSend(err, cmd);
}));
}
void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
Lock lock(mutex_);
if (pendingWriteOperations_++ > 0) {
pendingWriteBuffers_.emplace_back(args);
return;
}
auto self = shared_from_this();
auto sendMessageInternal = [this, self, args] {
BaseCommand outgoingCmd;
auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
// Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to the underlying socket send.
asyncWrite(buffer, customAllocWriteHandler(
[this, self, buffer](const ASIO_ERROR& err, size_t bytesTransferred) {
handleSendPair(err);
}));
};
if (tlsSocket_) {
ASIO::post(strand_, sendMessageInternal);
} else {
sendMessageInternal();
}
}
void ClientConnection::handleSend(const ASIO_ERROR& err, const SharedBuffer&) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
close(ResultDisconnected);
} else {
sendPendingCommands();
}
}
void ClientConnection::handleSendPair(const ASIO_ERROR& err) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString_ << "Could not send pair message on connection: " << err << " " << err.message());
close(ResultDisconnected);
} else {
sendPendingCommands();
}
}
void ClientConnection::sendPendingCommands() {
Lock lock(mutex_);
if (--pendingWriteOperations_ > 0) {
assert(!pendingWriteBuffers_.empty());
boost::any any = pendingWriteBuffers_.front();
pendingWriteBuffers_.pop_front();
auto self = shared_from_this();
if (any.type() == typeid(SharedBuffer)) {
SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler(
[this, self, buffer](const ASIO_ERROR& err, size_t) { handleSend(err, buffer); }));
} else {
assert(any.type() == typeid(std::shared_ptr<SendArguments>));
auto args = boost::any_cast<std::shared_ptr<SendArguments>>(any);
BaseCommand outgoingCmd;
PairSharedBuffer buffer =
Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
// Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to the underlying socket send.
asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) {
handleSendPair(err);
}));
}
} else {
// No more pending writes
outgoingBuffer_.reset();
}
}
Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cmd, int requestId) {
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
Promise<Result, ResponseData> promise;
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
PendingRequestData requestData;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (self) {
self->handleRequestTimeout(ec, requestData);
}
});
pendingRequests_.insert(std::make_pair(requestId, requestData));
lock.unlock();
sendCommand(cmd);
return requestData.promise.getFuture();
}
void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec, PendingRequestData pendingRequestData) {
if (!ec && !pendingRequestData.hasGotResponse->load()) {
pendingRequestData.promise.setFailed(ResultTimeout);
}
}
void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec, LookupRequestData pendingRequestData) {
if (!ec) {
pendingRequestData.promise->setFailed(ResultTimeout);
}
}
void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
ClientConnection::LastMessageIdRequestData data) {
if (!ec) {
data.promise->setFailed(ResultTimeout);
}
}
void ClientConnection::handleKeepAliveTimeout() {
if (isClosed()) {
return;
}
if (havePendingPingRequest_) {
LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive timeout");
close(ResultDisconnected);
} else {
// Send keep alive probe to peer
LOG_DEBUG(cnxString_ << "Sending ping message");
havePendingPingRequest_ = true;
sendCommand(Commands::newPing());
// If the close operation has already called the keepAliveTimer_.reset() then the use_count will
// be zero And we do not attempt to dereference the pointer.
Lock lock(mutex_);
if (keepAliveTimer_) {
keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds));
auto weakSelf = weak_from_this();
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
auto self = weakSelf.lock();
if (self) {
self->handleKeepAliveTimeout();
}
});
}
lock.unlock();
}
}
void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec,
std::vector<uint64_t> consumerStatsRequests) {
if (ec) {
LOG_DEBUG(cnxString_ << " Ignoring timer cancelled event, code[" << ec << "]");
return;
}
startConsumerStatsTimer(consumerStatsRequests);
}
void ClientConnection::close(Result result, bool detach) {
Lock lock(mutex_);
if (isClosed()) {
return;
}
state_ = Disconnected;
if (socket_) {
ASIO_ERROR err;
socket_->shutdown(ASIO::socket_base::shutdown_both, err);
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}
if (tlsSocket_) {
ASIO_ERROR err;
tlsSocket_->lowest_layer().close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
}
}
if (executor_) {
executor_.reset();
}
// Move the internal fields to process them after `mutex_` was unlocked
auto consumers = std::move(consumers_);
auto producers = std::move(producers_);
auto pendingRequests = std::move(pendingRequests_);
auto pendingLookupRequests = std::move(pendingLookupRequests_);
auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_);
auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_);
auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_);
numOfPendingLookupRequest_ = 0;
if (keepAliveTimer_) {
keepAliveTimer_->cancel();
keepAliveTimer_.reset();
}
if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->cancel();
consumerStatsRequestTimer_.reset();
}
if (connectTimeoutTask_) {
connectTimeoutTask_->stop();
}
lock.unlock();
int refCount = weak_from_this().use_count();
if (!isResultRetryable(result)) {
LOG_ERROR(cnxString_ << "Connection closed with " << result << " (refCnt: " << refCount << ")");
} else {
LOG_INFO(cnxString_ << "Connection disconnected (refCnt: " << refCount << ")");
}
// Remove the connection from the pool before completing any promise
if (detach) {
pool_.remove(logicalAddress_, physicalAddress_, poolIndex_, this);
}
auto self = shared_from_this();
for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
auto producer = it->second.lock();
if (producer) {
producer->handleDisconnection(result, self);
}
}
for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
auto consumer = it->second.lock();
if (consumer) {
consumer->handleDisconnection(result, self);
}
}
self.reset();
connectPromise_.setFailed(result);
// Fail all pending requests, all these type are map whose value type contains the Promise object
for (auto& kv : pendingRequests) {
kv.second.promise.setFailed(result);
}
for (auto& kv : pendingLookupRequests) {
kv.second.promise->setFailed(result);
}
for (auto& kv : pendingConsumerStatsMap) {
LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
kv.second.setFailed(result);
}
for (auto& kv : pendingGetLastMessageIdRequests) {
kv.second.promise->setFailed(result);
}
for (auto& kv : pendingGetNamespaceTopicsRequests) {
kv.second.setFailed(result);
}
for (auto& kv : pendingGetSchemaRequests) {
kv.second.promise.setFailed(result);
}
}
bool ClientConnection::isClosed() const { return state_ == Disconnected; }
Future<Result, ClientConnectionWeakPtr> ClientConnection::getConnectFuture() {
return connectPromise_.getFuture();
}
void ClientConnection::registerProducer(int producerId, ProducerImplPtr producer) {
Lock lock(mutex_);
producers_.insert(std::make_pair(producerId, producer));
}
void ClientConnection::registerConsumer(int consumerId, ConsumerImplPtr consumer) {
Lock lock(mutex_);
consumers_.insert(std::make_pair(consumerId, consumer));
}
void ClientConnection::removeProducer(int producerId) {
Lock lock(mutex_);
producers_.erase(producerId);
}
void ClientConnection::removeConsumer(int consumerId) {
Lock lock(mutex_);
consumers_.erase(consumerId);
}
const std::string& ClientConnection::brokerAddress() const { return physicalAddress_; }
const std::string& ClientConnection::cnxString() const { return cnxString_; }
int ClientConnection::getServerProtocolVersion() const { return serverProtocolVersion_; }
int32_t ClientConnection::getMaxMessageSize() { return maxMessageSize_.load(std::memory_order_acquire); }
Commands::ChecksumType ClientConnection::getChecksumType() const {
return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c : Commands::None;
}
Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(uint64_t consumerId,
uint64_t requestId) {
Lock lock(mutex_);
auto promise = std::make_shared<GetLastMessageIdResponsePromisePtr::element_type>();
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
promise->setFailed(ResultNotConnected);
return promise->getFuture();
}
LastMessageIdRequestData requestData;
requestData.promise = promise;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (self) {
self->handleGetLastMessageIdTimeout(ec, requestData);
}
});
pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, requestData));
lock.unlock();
sendCommand(Commands::newGetLastMessageId(consumerId, requestId));
return promise->getFuture();
}
Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, NamespaceTopicsPtr> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << "Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, promise));
lock.unlock();
sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId));
return promise.getFuture();
}
Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& topicName,
const std::string& version, uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, SchemaInfo> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << "Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
auto timer = executor_->createDeadlineTimer();
pendingGetSchemaRequests_.emplace(requestId, GetSchemaRequest{promise, timer});
lock.unlock();
auto weakSelf = weak_from_this();
timer->expires_from_now(operationsTimeout_);
timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
Lock lock(mutex_);
auto it = pendingGetSchemaRequests_.find(requestId);
if (it != pendingGetSchemaRequests_.end()) {
auto promise = std::move(it->second.promise);
pendingGetSchemaRequests_.erase(it);
lock.unlock();
promise.setFailed(ResultTimeout);
}
});
sendCommand(Commands::newGetSchema(topicName, version, requestId));
return promise.getFuture();
}
void ClientConnection::checkServerError(ServerError error, const std::string& message) {
pulsar::adaptor::checkServerError(*this, error, message);
}
void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& sendReceipt) {
int producerId = sendReceipt.producer_id();
uint64_t sequenceId = sendReceipt.sequence_id();
const proto::MessageIdData& messageIdData = sendReceipt.message_id();
auto messageId = toMessageId(messageIdData);
LOG_DEBUG(cnxString_ << "Got receipt for producer: " << producerId << " -- msg: " << sequenceId
<< "-- message id: " << messageId);
Lock lock(mutex_);
auto it = producers_.find(producerId);
if (it != producers_.end()) {
ProducerImplPtr producer = it->second.lock();
lock.unlock();
if (producer) {
if (!producer->ackReceived(sequenceId, messageId)) {
// If the producer fails to process the ack, we need to close the connection
// to give it a chance to recover from there
close(ResultDisconnected);
}
}
} else {
LOG_ERROR(cnxString_ << "Got invalid producer Id in SendReceipt: " //
<< producerId << " -- msg: " << sequenceId);
}
}
void ClientConnection::handleSendError(const proto::CommandSendError& error) {
LOG_WARN(cnxString_ << "Received send error from server: " << error.message());
if (ChecksumError == error.error()) {
long producerId = error.producer_id();
long sequenceId = error.sequence_id();
Lock lock(mutex_);
auto it = producers_.find(producerId);
if (it != producers_.end()) {
ProducerImplPtr producer = it->second.lock();
lock.unlock();
if (producer) {
if (!producer->removeCorruptMessage(sequenceId)) {
// If the producer fails to remove corrupt msg, we need to close the
// connection to give it a chance to recover from there
close(ResultDisconnected);
}
}
}
} else {
close(ResultDisconnected);
}
}
void ClientConnection::handleSuccess(const proto::CommandSuccess& success) {
LOG_DEBUG(cnxString_ << "Received success response from server. req_id: " << success.request_id());
Lock lock(mutex_);
auto it = pendingRequests_.find(success.request_id());
if (it != pendingRequests_.end()) {
PendingRequestData requestData = it->second;
pendingRequests_.erase(it);
lock.unlock();
requestData.promise.setValue({});
requestData.timer->cancel();
}
}
void ClientConnection::handlePartitionedMetadataResponse(
const proto::CommandPartitionedTopicMetadataResponse& partitionMetadataResponse) {
LOG_DEBUG(cnxString_ << "Received partition-metadata response from server. req_id: "
<< partitionMetadataResponse.request_id());
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(partitionMetadataResponse.request_id());
if (it != pendingLookupRequests_.end()) {
it->second.timer->cancel();
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock();
if (!partitionMetadataResponse.has_response() ||
(partitionMetadataResponse.response() ==
proto::CommandPartitionedTopicMetadataResponse::Failed)) {
if (partitionMetadataResponse.has_error()) {
LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
<< partitionMetadataResponse.request_id()
<< " error: " << partitionMetadataResponse.error()
<< " msg: " << partitionMetadataResponse.message());
checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message());
lookupDataPromise->setFailed(
getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message()));
} else {
LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
<< partitionMetadataResponse.request_id() << " with empty response: ");
lookupDataPromise->setFailed(ResultConnectError);
}
} else {
LookupDataResultPtr lookupResultPtr = std::make_shared<LookupDataResult>();
lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
lookupDataPromise->setValue(lookupResultPtr);
}
} else {
LOG_WARN("Received unknown request id from server: " << partitionMetadataResponse.request_id());
}
}
void ClientConnection::handleConsumerStatsResponse(
const proto::CommandConsumerStatsResponse& consumerStatsResponse) {
LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received consumer stats "
"response from server. req_id: "
<< consumerStatsResponse.request_id());
Lock lock(mutex_);
auto it = pendingConsumerStatsMap_.find(consumerStatsResponse.request_id());
if (it != pendingConsumerStatsMap_.end()) {
Promise<Result, BrokerConsumerStatsImpl> consumerStatsPromise = it->second;
pendingConsumerStatsMap_.erase(it);
lock.unlock();
if (consumerStatsResponse.has_error_code()) {
if (consumerStatsResponse.has_error_message()) {
LOG_ERROR(cnxString_ << " Failed to get consumer stats - "
<< consumerStatsResponse.error_message());
}
consumerStatsPromise.setFailed(
getResult(consumerStatsResponse.error_code(), consumerStatsResponse.error_message()));
} else {
LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received consumer stats "
"response from server. req_id: "
<< consumerStatsResponse.request_id() << " Stats: ");
BrokerConsumerStatsImpl brokerStats(
consumerStatsResponse.msgrateout(), consumerStatsResponse.msgthroughputout(),
consumerStatsResponse.msgrateredeliver(), consumerStatsResponse.consumername(),
consumerStatsResponse.availablepermits(), consumerStatsResponse.unackedmessages(),
consumerStatsResponse.blockedconsumeronunackedmsgs(), consumerStatsResponse.address(),
consumerStatsResponse.connectedsince(), consumerStatsResponse.type(),
consumerStatsResponse.msgrateexpired(), consumerStatsResponse.msgbacklog());
consumerStatsPromise.setValue(brokerStats);
}
} else {
LOG_WARN("ConsumerStatsResponse command - Received unknown request id from server: "
<< consumerStatsResponse.request_id());
}
}
void ClientConnection::handleLookupTopicRespose(
const proto::CommandLookupTopicResponse& lookupTopicResponse) {
LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
<< lookupTopicResponse.request_id());
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
it->second.timer->cancel();
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock();
if (!lookupTopicResponse.has_response() ||
(lookupTopicResponse.response() == proto::CommandLookupTopicResponse::Failed)) {
if (lookupTopicResponse.has_error()) {
LOG_ERROR(cnxString_ << "Failed lookup req_id: " << lookupTopicResponse.request_id()
<< " error: " << lookupTopicResponse.error()
<< " msg: " << lookupTopicResponse.message());
checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message());
lookupDataPromise->setFailed(
getResult(lookupTopicResponse.error(), lookupTopicResponse.message()));
} else {
LOG_ERROR(cnxString_ << "Failed lookup req_id: " << lookupTopicResponse.request_id()
<< " with empty response: ");
lookupDataPromise->setFailed(ResultConnectError);
}
} else {
LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
<< lookupTopicResponse.request_id() //
<< " -- broker-url: " << lookupTopicResponse.brokerserviceurl()
<< " -- broker-tls-url: " //
<< lookupTopicResponse.brokerserviceurltls()
<< " authoritative: " << lookupTopicResponse.authoritative() //
<< " redirect: " << lookupTopicResponse.response());
LookupDataResultPtr lookupResultPtr = std::make_shared<LookupDataResult>();
if (tlsSocket_) {
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurltls());
} else {
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl());
}
lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls());
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
proto::CommandLookupTopicResponse::Redirect);
lookupResultPtr->setShouldProxyThroughServiceUrl(lookupTopicResponse.proxy_through_service_url());
lookupDataPromise->setValue(lookupResultPtr);
}
} else {
LOG_WARN("Received unknown request id from server: " << lookupTopicResponse.request_id());
}
}
void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess& producerSuccess) {
LOG_DEBUG(cnxString_ << "Received success producer response from server. req_id: "
<< producerSuccess.request_id() //
<< " -- producer name: " << producerSuccess.producer_name());
Lock lock(mutex_);
auto it = pendingRequests_.find(producerSuccess.request_id());
if (it != pendingRequests_.end()) {
PendingRequestData requestData = it->second;
if (!producerSuccess.producer_ready()) {
LOG_INFO(cnxString_ << " Producer " << producerSuccess.producer_name()
<< " has been queued up at broker. req_id: " << producerSuccess.request_id());
requestData.hasGotResponse->store(true);
lock.unlock();
} else {
pendingRequests_.erase(it);
lock.unlock();
ResponseData data;
data.producerName = producerSuccess.producer_name();
data.lastSequenceId = producerSuccess.last_sequence_id();
if (producerSuccess.has_schema_version()) {
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch());
} else {
data.topicEpoch = boost::none;
}
requestData.promise.setValue(data);
requestData.timer->cancel();
}
}
}
void ClientConnection::handleError(const proto::CommandError& error) {
Result result = getResult(error.error(), error.message());
LOG_WARN(cnxString_ << "Received error response from server: " << result
<< (error.has_message() ? (" (" + error.message() + ")") : "")
<< " -- req_id: " << error.request_id());
Lock lock(mutex_);
auto it = pendingRequests_.find(error.request_id());
if (it != pendingRequests_.end()) {
PendingRequestData requestData = it->second;
pendingRequests_.erase(it);
lock.unlock();
requestData.promise.setFailed(result);
requestData.timer->cancel();
} else {
PendingGetLastMessageIdRequestsMap::iterator it =
pendingGetLastMessageIdRequests_.find(error.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
auto getLastMessageIdPromise = it->second.promise;
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
getLastMessageIdPromise->setFailed(result);
} else {
PendingGetNamespaceTopicsMap::iterator it =
pendingGetNamespaceTopicsRequests_.find(error.request_id());
if (it != pendingGetNamespaceTopicsRequests_.end()) {
Promise<Result, NamespaceTopicsPtr> getNamespaceTopicsPromise = it->second;
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();
getNamespaceTopicsPromise.setFailed(result);
} else {
lock.unlock();
}
}
}
}
std::string ClientConnection::getMigratedBrokerServiceUrl(
const proto::CommandTopicMigrated& commandTopicMigrated) {
if (tlsSocket_) {
if (commandTopicMigrated.has_brokerserviceurltls()) {
return commandTopicMigrated.brokerserviceurltls();
}
} else if (commandTopicMigrated.has_brokerserviceurl()) {
return commandTopicMigrated.brokerserviceurl();
}
return "";
}
void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& commandTopicMigrated) {
const long resourceId = commandTopicMigrated.resource_id();
const std::string migratedBrokerServiceUrl = getMigratedBrokerServiceUrl(commandTopicMigrated);
if (migratedBrokerServiceUrl.empty()) {
LOG_WARN("Failed to find the migrated broker url for resource:"
<< resourceId
<< (commandTopicMigrated.has_brokerserviceurl()
? ", migratedBrokerUrl: " + commandTopicMigrated.brokerserviceurl()
: "")
<< (commandTopicMigrated.has_brokerserviceurltls()
? ", migratedBrokerUrlTls: " + commandTopicMigrated.brokerserviceurltls()
: ""));
return;
}
Lock lock(mutex_);
if (commandTopicMigrated.resource_type() == proto::CommandTopicMigrated_ResourceType_Producer) {
auto it = producers_.find(resourceId);
if (it != producers_.end()) {
auto producer = it->second.lock();
producer->setRedirectedClusterURI(migratedBrokerServiceUrl);
unsafeRemovePendingRequest(producer->firstRequestIdAfterConnect());
LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
} else {
LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId);
}
} else {
auto it = consumers_.find(resourceId);
if (it != consumers_.end()) {
auto consumer = it->second.lock();
consumer->setRedirectedClusterURI(migratedBrokerServiceUrl);
unsafeRemovePendingRequest(consumer->firstRequestIdAfterConnect());
LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
} else {
LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId);
}
}
}
boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseProducer& closeProducer) {
if (tlsSocket_) {
if (closeProducer.has_assignedbrokerserviceurltls()) {
return closeProducer.assignedbrokerserviceurltls();
}
} else if (closeProducer.has_assignedbrokerserviceurl()) {
return closeProducer.assignedbrokerserviceurl();
}
return boost::none;
}
boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseConsumer& closeConsumer) {
if (tlsSocket_) {
if (closeConsumer.has_assignedbrokerserviceurltls()) {
return closeConsumer.assignedbrokerserviceurltls();
}
} else if (closeConsumer.has_assignedbrokerserviceurl()) {
return closeConsumer.assignedbrokerserviceurl();
}
return boost::none;
}
void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& closeProducer) {
int producerId = closeProducer.producer_id();
LOG_DEBUG("Broker notification of Closed producer: " << producerId);
Lock lock(mutex_);
auto it = producers_.find(producerId);
if (it != producers_.end()) {
ProducerImplPtr producer = it->second.lock();
producers_.erase(it);
lock.unlock();
if (producer) {
auto assignedBrokerServiceUrl = getAssignedBrokerServiceUrl(closeProducer);
producer->disconnectProducer(assignedBrokerServiceUrl);
}
} else {
LOG_ERROR(cnxString_ << "Got invalid producer Id in closeProducer command: " << producerId);
}
}
void ClientConnection::handleCloseConsumer(const proto::CommandCloseConsumer& closeconsumer) {
int consumerId = closeconsumer.consumer_id();
LOG_DEBUG("Broker notification of Closed consumer: " << consumerId);
Lock lock(mutex_);
auto it = consumers_.find(consumerId);
if (it != consumers_.end()) {
ConsumerImplPtr consumer = it->second.lock();
consumers_.erase(it);
lock.unlock();
if (consumer) {
auto assignedBrokerServiceUrl = getAssignedBrokerServiceUrl(closeconsumer);
consumer->disconnectConsumer(assignedBrokerServiceUrl);
}
} else {
LOG_ERROR(cnxString_ << "Got invalid consumer Id in closeConsumer command: " << consumerId);
}
}
void ClientConnection::handleAuthChallenge() {
LOG_DEBUG(cnxString_ << "Received auth challenge from broker");
Result result;
SharedBuffer buffer = Commands::newAuthResponse(authentication_, result);
if (result != ResultOk) {
LOG_ERROR(cnxString_ << "Failed to send auth response: " << result);
close(result);
return;
}
auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) {
handleSentAuthResponse(err, buffer);
}));
}
void ClientConnection::handleGetLastMessageIdResponse(
const proto::CommandGetLastMessageIdResponse& getLastMessageIdResponse) {
LOG_DEBUG(cnxString_ << "Received getLastMessageIdResponse from server. req_id: "
<< getLastMessageIdResponse.request_id());
Lock lock(mutex_);
auto it = pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
auto getLastMessageIdPromise = it->second.promise;
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
if (getLastMessageIdResponse.has_consumer_mark_delete_position()) {
getLastMessageIdPromise->setValue(
{toMessageId(getLastMessageIdResponse.last_message_id()),
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
} else {
getLastMessageIdPromise->setValue({toMessageId(getLastMessageIdResponse.last_message_id())});
}
} else {
lock.unlock();
LOG_WARN("getLastMessageIdResponse command - Received unknown request id from server: "
<< getLastMessageIdResponse.request_id());
}
}
void ClientConnection::handleGetTopicOfNamespaceResponse(
const proto::CommandGetTopicsOfNamespaceResponse& response) {
LOG_DEBUG(cnxString_ << "Received GetTopicsOfNamespaceResponse from server. req_id: "
<< response.request_id() << " topicsSize" << response.topics_size());
Lock lock(mutex_);
auto it = pendingGetNamespaceTopicsRequests_.find(response.request_id());
if (it != pendingGetNamespaceTopicsRequests_.end()) {
Promise<Result, NamespaceTopicsPtr> getTopicsPromise = it->second;
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();
int numTopics = response.topics_size();
std::set<std::string> topicSet;
// get all topics
for (int i = 0; i < numTopics; i++) {
// remove partition part
const std::string& topicName = response.topics(i);
int pos = topicName.find("-partition-");
std::string filteredName = topicName.substr(0, pos);
// filter duped topic name
if (topicSet.find(filteredName) == topicSet.end()) {
topicSet.insert(filteredName);
}
}
NamespaceTopicsPtr topicsPtr =
std::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end());
getTopicsPromise.setValue(topicsPtr);
} else {
lock.unlock();
LOG_WARN(
"GetTopicsOfNamespaceResponse command - Received unknown request id from "
"server: "
<< response.request_id());
}
}
void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResponse& response) {
LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: " << response.request_id());
Lock lock(mutex_);
auto it = pendingGetSchemaRequests_.find(response.request_id());
if (it != pendingGetSchemaRequests_.end()) {
Promise<Result, SchemaInfo> getSchemaPromise = it->second.promise;
pendingGetSchemaRequests_.erase(it);
lock.unlock();
if (response.has_error_code()) {
Result result = getResult(response.error_code(), response.error_message());
if (response.error_code() != proto::TopicNotFound) {
LOG_WARN(cnxString_ << "Received error GetSchemaResponse from server " << result
<< (response.has_error_message() ? (" (" + response.error_message() + ")")
: "")
<< " -- req_id: " << response.request_id());
}
getSchemaPromise.setFailed(result);
return;
}
const auto& schema = response.schema();
const auto& properMap = schema.properties();
StringMap properties;
for (auto kv = properMap.begin(); kv != properMap.end(); ++kv) {
properties[kv->key()] = kv->value();
}
SchemaInfo schemaInfo(static_cast<SchemaType>(schema.type()), "", schema.schema_data(), properties);
getSchemaPromise.setValue(schemaInfo);
} else {
lock.unlock();
LOG_WARN(
"GetSchemaResponse command - Received unknown request id from "
"server: "
<< response.request_id());
}
}
void ClientConnection::handleAckResponse(const proto::CommandAckResponse& response) {
LOG_DEBUG(cnxString_ << "Received AckResponse from server. req_id: " << response.request_id());
Lock lock(mutex_);
auto it = pendingRequests_.find(response.request_id());
if (it == pendingRequests_.cend()) {
lock.unlock();
LOG_WARN("Cannot find the cached AckResponse whose req_id is " << response.request_id());
return;
}
auto promise = it->second.promise;
pendingRequests_.erase(it);
lock.unlock();
if (response.has_error()) {
promise.setFailed(getResult(response.error(), ""));
} else {
promise.setValue({});
}
}
void ClientConnection::unsafeRemovePendingRequest(long requestId) {
auto it = pendingRequests_.find(requestId);
if (it != pendingRequests_.end()) {
it->second.promise.setFailed(ResultDisconnected);
ASIO_ERROR ec;
it->second.timer->cancel(ec);
pendingRequests_.erase(it);
}
}
} // namespace pulsar