blob: bcb35be7a64bdf6b8b224c1a217c19fda0b8db44 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ClientConnection.h"
#include "PulsarApi.pb.h"
#include <boost/shared_ptr.hpp>
#include <boost/array.hpp>
#include <iostream>
#include <algorithm>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/date_time/gregorian/gregorian.hpp>
#include <boost/filesystem.hpp>
#include <climits>
#include "ExecutorService.h"
#include "Commands.h"
#include "LogUtils.h"
#include "Url.h"
#include <boost/bind.hpp>
#include <string>
#include "ProducerImpl.h"
#include "ConsumerImpl.h"
#include "checksum/ChecksumProvider.h"
DECLARE_LOG_OBJECT()
using namespace pulsar::proto;
using namespace boost::asio::ip;
namespace pulsar {
static const uint32_t DefaultBufferSize = 64 * 1024;
static const int KeepAliveIntervalInSeconds = 30;
// Convert error codes from protobuf to client API Result
static Result getResult(ServerError serverError) {
switch (serverError) {
case UnknownError:
return ResultUnknownError;
case MetadataError:
return ResultBrokerMetadataError;
case ChecksumError:
return ResultChecksumError;
case PersistenceError:
return ResultBrokerPersistenceError;
case AuthenticationError:
return ResultAuthenticationError;
case AuthorizationError:
return ResultAuthorizationError;
case ConsumerBusy:
return ResultConsumerBusy;
case ServiceNotReady:
return ResultServiceUnitNotReady;
case ProducerBlockedQuotaExceededError:
return ResultProducerBlockedQuotaExceededError;
case ProducerBlockedQuotaExceededException:
return ResultProducerBlockedQuotaExceededException;
case TopicNotFound:
return ResultTopicNotFound;
case SubscriptionNotFound:
return ResultSubscriptionNotFound;
case ConsumerNotFound:
return ResultConsumerNotFound;
case UnsupportedVersionError:
return ResultUnsupportedVersionError;
}
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
// return here to make the compiler happy.
return ResultUnknownError;
}
ClientConnection::ClientConnection(const std::string& endpoint, ExecutorServicePtr executor,
const ClientConfiguration& clientConfiguration, const AuthenticationPtr& authentication)
: state_(Pending),
operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
authentication_(authentication),
serverProtocolVersion_(ProtocolVersion_MIN),
executor_(executor),
resolver_(executor->createTcpResolver()),
socket_(executor->createSocket()),
address_(endpoint),
cnxString_("[<none> -> " + endpoint + "] "),
error_(boost::system::error_code()),
incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
incomingCmd_(),
pendingWriteBuffers_(),
pendingWriteOperations_(0),
outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
outgoingCmd_(),
havePendingPingRequest_(false),
keepAliveTimer_(),
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
consumerStatsTTLMs_(30 * 1000),
numOfPendingLookupRequest_(0),
isTlsAllowInsecureConnection_(false) {
if (clientConfiguration.isUseTls()) {
using namespace boost::filesystem;
#if BOOST_VERSION >= 105400
boost::asio::ssl::context ctx(executor_->io_service_, boost::asio::ssl::context::tlsv12_client);
#else
boost::asio::ssl::context ctx(executor_->io_service_, boost::asio::ssl::context::tlsv1_client);
#endif
if (clientConfiguration.isTlsAllowInsecureConnection()) {
ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
isTlsAllowInsecureConnection_ = true;
} else {
ctx.set_verify_mode(boost::asio::ssl::context::verify_peer);
std::string trustCertFilePath = clientConfiguration.getTlsTrustCertsFilePath();
if (exists(path(trustCertFilePath))) {
ctx.load_verify_file(trustCertFilePath);
} else {
LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
close();
return;
}
}
AuthenticationDataPtr authData;
if (authentication_->getAuthData(authData) == ResultOk && authData->hasDataForTls()){
std::string tlsCertificates = authData->getTlsCertificates();
std::string tlsPrivateKey = authData->getTlsPrivateKey();
if (exists(path(tlsCertificates))) {
ctx.use_certificate_file(tlsCertificates, boost::asio::ssl::context::pem);
} else {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
close();
return;
}
if (exists(path(tlsPrivateKey))) {
ctx.use_private_key_file(tlsPrivateKey, boost::asio::ssl::context::pem);
} else {
LOG_ERROR(tlsPrivateKey << ": No such tlsPrivateKey");
close();
return;
}
}
tlsSocket_ = executor->createTlsSocket(socket_, ctx);
}
}
ClientConnection::~ClientConnection() {
LOG_INFO(cnxString_ << "Destroyed connection");
}
void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnected) {
if (!cmdConnected.has_server_version()) {
LOG_ERROR(cnxString_ << "Server version is not set");
close();
return;
}
state_ = Ready;
serverProtocolVersion_ = cmdConnected.protocol_version();
connectPromise_.setValue(shared_from_this());
if (serverProtocolVersion_ >= v1) {
// Only send keep-alive probes if the broker supports it
keepAliveTimer_ = executor_->createDeadlineTimer();
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
keepAliveTimer_->async_wait(
boost::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
}
if (serverProtocolVersion_ >= v7) {
startConsumerStatsTimer(std::vector<uint64_t>());
}
}
void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests) {
std::vector<Promise<Result, BrokerConsumerStats> > consumerStatsPromises;
Lock lock(mutex_);
for (int i = 0; i < consumerStatsRequests.size(); i++) {
PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find(consumerStatsRequests[i]);
if (it != pendingConsumerStatsMap_.end()) {
LOG_DEBUG(cnxString_ << " removing request_id " << it->first << " from the pendingConsumerStatsMap_");
consumerStatsPromises.push_back(it->second);
pendingConsumerStatsMap_.erase(it);
} else {
LOG_DEBUG(cnxString_ << "request_id " << it->first << " already fulfilled - not removing it");
}
}
consumerStatsRequests.clear();
for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.begin();
it != pendingConsumerStatsMap_.end(); ++it) {
consumerStatsRequests.push_back(it->first);
}
consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
consumerStatsRequestTimer_->async_wait(
boost::bind(&ClientConnection::handleConsumerStatsTimeout, shared_from_this(),
boost::asio::placeholders::error, consumerStatsRequests));
lock.unlock();
// Complex logic since promises need to be fulfilled outside the lock
for (int i = 0; i < consumerStatsPromises.size(); i++) {
consumerStatsPromises[i].setFailed(ResultTimeout);
LOG_WARN(cnxString_ << " Operation timedout, didn't get response from broker");
}
}
/// The number of unacknowledged probes to send before considering the connection dead and notifying the application layer
typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPCNT> tcp_keep_alive_count;
/// The interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime
typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPINTVL> tcp_keep_alive_interval;
/// The interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive
/// probe; after the connection is marked to need keepalive, this counter is not used any further
#ifdef __APPLE__
typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> tcp_keep_alive_idle;
#else
typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep_alive_idle;
#endif
/*
* TCP Connect handler
*
* if async_connect without any error, connected_ would be set to true
* at this point the connection is deemed valid to be used by clients of this class
*/
void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
tcp::resolver::iterator endpointIterator) {
if (!err) {
std::stringstream cnxStringStream;
cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
cnxString_ = cnxStringStream.str();
LOG_INFO(cnxString_ << "Connected to broker");
state_ = TcpConnected;
socket_->set_option(tcp::no_delay(true));
socket_->set_option(tcp::socket::keep_alive(true));
// Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
// should never happen, given that we're sending our own keep-alive probes (within the TCP
// connection) every 30 seconds
socket_->set_option(tcp_keep_alive_idle(1 * 60));
// Send up to 10 probes before declaring the connection broken
socket_->set_option(tcp_keep_alive_count(10));
// Interval between probes: 6 seconds
socket_->set_option(tcp_keep_alive_interval(6));
if (tlsSocket_) {
if (!isTlsAllowInsecureConnection_) {
boost::system::error_code err;
Url service_url;
if (!Url::parse(address_, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
}
}
tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client, boost::bind(&ClientConnection::handleHandshake, shared_from_this(), boost::asio::placeholders::error));
} else {
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
}
} else if (endpointIterator != tcp::resolver::iterator()) {
// The connection failed. Try the next endpoint in the list.
socket_->close();
tcp::endpoint endpoint = *endpointIterator;
socket_->async_connect(
endpoint,
boost::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
boost::asio::placeholders::error, ++endpointIterator));
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close();
return;
}
}
void ClientConnection::handleHandshake(const boost::system::error_code& err) {
SharedBuffer buffer = Commands::newConnect(authentication_);
// Send CONNECT command to broker
asyncWrite(
buffer.const_asio_buffer(),
boost::bind(&ClientConnection::handleSentPulsarConnect, shared_from_this(),
boost::asio::placeholders::error, buffer));
}
void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& err,
const SharedBuffer& buffer) {
if (err) {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close();
return;
}
// Schedule the reading of CONNECTED command from broker
readNextCommand();
}
/*
* Async method to establish TCP connection with broker
*
* tcpConnectCompletionHandler is notified when the result of this call is available.
*
*/
void ClientConnection::tcpConnectAsync() {
boost::system::error_code err;
Url service_url;
if (!Url::parse(address_, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
}
if (service_url.protocol() != "pulsar" && service_url.protocol() != "pulsar+ssl") {
LOG_ERROR(cnxString_ << "Invalid Url protocol '" << service_url.protocol() << "'. Valid values are 'pulsar' and 'pulsar+ssl'");
close();
return;
}
LOG_DEBUG(cnxString_ << "Connecting to " << service_url.host() << ":" << service_url.port());
tcp::resolver::query query(service_url.host(),
boost::lexical_cast<std::string>(service_url.port()));
resolver_->async_resolve(
query,
boost::bind(&ClientConnection::handleResolve, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::iterator));
}
void ClientConnection::handleResolve(const boost::system::error_code& err,
tcp::resolver::iterator endpointIterator) {
if (err) {
LOG_ERROR(cnxString_ << "Resolve error: " << err << " : " << err.message());
close();
return;
}
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
<< " to " << endpointIterator->endpoint());
socket_->async_connect(
*endpointIterator++,
boost::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
boost::asio::placeholders::error, endpointIterator));
} else {
LOG_WARN(cnxString_ << "No IP address found");
close();
return;
}
}
void ClientConnection::readNextCommand() {
const static uint32_t minReadSize = sizeof(uint32_t);
asyncReceive(
incomingBuffer_.asio_buffer(),
customAllocReadHandler(
boost::bind(&ClientConnection::handleRead, shared_from_this(), _1, _2,
minReadSize)));
}
void ClientConnection::handleRead(const boost::system::error_code& err, size_t bytesTransferred,
uint32_t minReadSize) {
// Update buffer write idx with new data
incomingBuffer_.bytesWritten(bytesTransferred);
if (err || bytesTransferred == 0) {
close();
} else if (bytesTransferred < minReadSize) {
// Read the remaining part, use a slice of buffer to write on the next
// region
SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred);
asyncReceive(
buffer.asio_buffer(),
customAllocReadHandler(
boost::bind(&ClientConnection::handleRead, shared_from_this(), _1, _2,
minReadSize - bytesTransferred)));
} else {
processIncomingBuffer();
}
}
void ClientConnection::processIncomingBuffer() {
// Process all the available frames from the incoming buffer
while (incomingBuffer_.readableBytes() >= sizeof(uint32_t)) {
// Extract message frames from incoming buffer
// At this point we have at least 4 bytes in the buffer
uint32_t frameSize = incomingBuffer_.readUnsignedInt();
if (frameSize > incomingBuffer_.readableBytes()) {
// We don't have the entire frame yet
const uint32_t bytesToReceive = frameSize - incomingBuffer_.readableBytes();
// Rollback the reading of frameSize (when the frame will be complete,
// we'll read it again
incomingBuffer_.rollback(sizeof(uint32_t));
if (bytesToReceive <= incomingBuffer_.writableBytes()) {
// The rest of the frame still fits in the current buffer
asyncReceive(
incomingBuffer_.asio_buffer(),
customAllocReadHandler(
boost::bind(&ClientConnection::handleRead, shared_from_this(), _1,
_2, bytesToReceive)));
return;
} else {
// Need to allocate a buffer big enough for the frame
uint32_t newBufferSize = std::max<uint32_t>(DefaultBufferSize,
frameSize + sizeof(uint32_t));
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, newBufferSize);
asyncReceive(
incomingBuffer_.asio_buffer(),
customAllocReadHandler(
boost::bind(&ClientConnection::handleRead, shared_from_this(), _1,
_2, bytesToReceive)));
return;
}
}
// At this point, we have at least one complete frame available in the buffer
uint32_t cmdSize = incomingBuffer_.readUnsignedInt();
if (!incomingCmd_.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
close();
return;
}
incomingBuffer_.consume(cmdSize);
if (incomingCmd_.type() == BaseCommand::MESSAGE) {
// Parse message metadata and extract payload
MessageMetadata msgMetadata;
//read checksum
bool isChecksumValid = verifyChecksum(incomingBuffer_, incomingCmd_);
uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd_.message().consumer_id() //
<< ", message ledger id " << incomingCmd_.message().message_id().ledgerid() //
<< ", entry id " << incomingCmd_.message().message_id().entryid() << "] Error parsing message metadata");
close();
return;
}
incomingBuffer_.consume(metadataSize);
uint32_t payloadSize = frameSize - (cmdSize + 4) - (metadataSize + 4);
SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
incomingBuffer_.consume(payloadSize);
handleIncomingMessage(incomingCmd_.message(), isChecksumValid, msgMetadata, payload);
} else {
handleIncomingCommand();
}
}
if (incomingBuffer_.readableBytes() > 0) {
// We still have 1 to 3 bytes from the next frame
assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));
// Restart with a new buffer and copy the the few bytes at the beginning
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize);
// At least we need to read 4 bytes to have the complete frame size
uint32_t minReadSize = sizeof(uint32_t) - incomingBuffer_.readableBytes();
asyncReceive(
incomingBuffer_.asio_buffer(),
customAllocReadHandler(
boost::bind(&ClientConnection::handleRead, shared_from_this(), _1, _2,
minReadSize)));
return;
}
// We have read everything we had in the buffer
// Rollback the indexes to reuse the same buffer
incomingBuffer_.reset();
readNextCommand();
}
bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, proto::BaseCommand& incomingCmd_) {
int readerIndex = incomingBuffer_.readerIndex();
bool isChecksumValid = true;
if (incomingBuffer_.readUnsignedShort() == Commands::magicCrc32c) {
uint32_t storedChecksum = incomingBuffer_.readUnsignedInt();
// compute metadata-payload checksum
int metadataPayloadSize = incomingBuffer_.readableBytes();
uint32_t computedChecksum = computeChecksum(0, incomingBuffer_.data(), metadataPayloadSize);
// verify checksum
isChecksumValid = (storedChecksum == computedChecksum);
if (!isChecksumValid) {
LOG_ERROR("[consumer id " << incomingCmd_.message().consumer_id() //
<< ", message ledger id " << incomingCmd_.message().message_id().ledgerid()//
<< ", entry id " << incomingCmd_.message().message_id().entryid()//
<< "stored-checksum" << storedChecksum << "computedChecksum" << computedChecksum//
<< "] Checksum verification failed");
}
} else {
incomingBuffer_.setReaderIndex(readerIndex);
}
return isChecksumValid;
}
void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
proto::MessageMetadata& msgMetadata,
SharedBuffer& payload) {
LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: " << msg.consumer_id());
Lock lock(mutex_);
ConsumersMap::iterator it = consumers_.find(msg.consumer_id());
if (it != consumers_.end()) {
ConsumerImplPtr consumer = it->second.lock();
if (consumer) {
// Unlock the mutex before notifying the consumer of the
// new received message
lock.unlock();
consumer->messageReceived(shared_from_this(), msg, isChecksumValid, msgMetadata, payload);
} else {
consumers_.erase(msg.consumer_id());
LOG_DEBUG(
cnxString_ << "Ignoring incoming message for already destroyed consumer " << msg.consumer_id());
}
} else {
LOG_DEBUG(cnxString_ << "Got invalid consumer Id in " //
<< msg.consumer_id() << " -- msg: "<< msgMetadata.sequence_id());
}
}
void ClientConnection::handleIncomingCommand() {
LOG_DEBUG(
cnxString_ << "Handling incoming command: " << Commands::messageType(incomingCmd_.type()));
switch (state_) {
case Pending: {
LOG_ERROR(cnxString_ << "Connection is not ready yet");
break;
}
case TcpConnected: {
// Handle Pulsar Connected
if (incomingCmd_.type() != BaseCommand::CONNECTED) {
// Wrong cmd
close();
} else {
handlePulsarConnected(incomingCmd_.connected());
}
break;
}
case Disconnected: {
LOG_ERROR(cnxString_ << "Connection already disconnected");
break;
}
case Ready: {
// Handle normal commands
switch (incomingCmd_.type()) {
case BaseCommand::SEND_RECEIPT: {
const CommandSendReceipt& sendReceipt = incomingCmd_.send_receipt();
int producerId = sendReceipt.producer_id();
uint64_t sequenceId = sendReceipt.sequence_id();
LOG_DEBUG(
cnxString_ << "Got receipt for producer: " << producerId << " -- msg: "<< sequenceId);
Lock lock(mutex_);
ProducersMap::iterator it = producers_.find(producerId);
if (it != producers_.end()) {
ProducerImplPtr producer = it->second.lock();
lock.unlock();
if (producer) {
if (!producer->ackReceived(sequenceId)) {
// If the producer fails to process the ack, we need to close the connection to give it a chance to recover from there
close();
}
}
} else {
LOG_ERROR(cnxString_ << "Got invalid producer Id in SendReceipt: " //
<< producerId << " -- msg: "<< sequenceId);
}
break;
}
case BaseCommand::SEND_ERROR: {
const CommandSendError& error = incomingCmd_.send_error();
LOG_WARN(cnxString_ << "Received send error from server: " << error.message());
if (ChecksumError == error.error()) {
long producerId = error.producer_id();
long sequenceId = error.sequence_id();
Lock lock(mutex_);
ProducersMap::iterator it = producers_.find(producerId);
if (it != producers_.end()) {
ProducerImplPtr producer = it->second.lock();
lock.unlock();
if (producer) {
if (!producer->removeCorruptMessage(sequenceId)) {
// If the producer fails to remove corrupt msg, we need to close the connection to give it a chance to recover from there
close();
}
}
}
} else {
close();
}
break;
}
case BaseCommand::SUCCESS: {
const CommandSuccess& success = incomingCmd_.success();
LOG_DEBUG(
cnxString_ << "Received success response from server. req_id: " << success.request_id());
Lock lock(mutex_);
PendingRequestsMap::iterator it = pendingRequests_.find(success.request_id());
if (it != pendingRequests_.end()) {
PendingRequestData requestData = it->second;
pendingRequests_.erase(it);
lock.unlock();
requestData.promise.setValue("");
requestData.timer->cancel();
}
break;
}
case BaseCommand::PARTITIONED_METADATA_RESPONSE: {
const CommandPartitionedTopicMetadataResponse& partitionMetadataResponse =
incomingCmd_.partitionmetadataresponse();
LOG_DEBUG(
cnxString_ << "Received partition-metadata response from server. req_id: " << partitionMetadataResponse.request_id());
Lock lock(mutex_);
PendingLookupRequestsMap::iterator it = pendingLookupRequests_.find(
partitionMetadataResponse.request_id());
if (it != pendingLookupRequests_.end()) {
LookupDataResultPromisePtr lookupDataPromise = it->second;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock();
if (!partitionMetadataResponse.has_response()
|| (partitionMetadataResponse.response()
== CommandPartitionedTopicMetadataResponse::Failed)) {
if (partitionMetadataResponse.has_error()) {
LOG_ERROR(
cnxString_ << "Failed partition-metadata lookup req_id: " << partitionMetadataResponse.request_id() << " error: " << partitionMetadataResponse.error());
} else {
LOG_ERROR(
cnxString_ << "Failed partition-metadata lookup req_id: " << partitionMetadataResponse.request_id() << " with empty response: ");
}
lookupDataPromise->setFailed(ResultConnectError);
} else {
LookupDataResultPtr lookupResultPtr = boost::make_shared<
LookupDataResult>();
lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
lookupDataPromise->setValue(lookupResultPtr);
}
} else {
LOG_WARN(
"Received unknown request id from server: " << partitionMetadataResponse.request_id());
}
break;
}
case BaseCommand::CONSUMER_STATS_RESPONSE: {
const CommandConsumerStatsResponse& consumerStatsResponse = incomingCmd_.consumerstatsresponse();
LOG_DEBUG(
cnxString_
<< "ConsumerStatsResponse command - Received consumer stats response from server. req_id: "
<< consumerStatsResponse.request_id());
Lock lock(mutex_);
PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find(
consumerStatsResponse.request_id());
if (it != pendingConsumerStatsMap_.end()) {
Promise<Result, BrokerConsumerStats> consumerStatsPromise = it->second;
pendingConsumerStatsMap_.erase(it);
lock.unlock();
if (consumerStatsResponse.has_error_code()) {
if (consumerStatsResponse.has_error_message()) {
LOG_ERROR(cnxString_ << " Failed to get consumer stats - "
<< consumerStatsResponse.error_message());
}
consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code()));
} else {
LOG_DEBUG(
cnxString_
<< "ConsumerStatsResponse command - Received consumer stats response from server. req_id: "
<< consumerStatsResponse.request_id() << " Stats: ");
boost::posix_time::ptime validTill = now() + milliseconds(consumerStatsTTLMs_);
BrokerConsumerStats brokerStats =
BrokerConsumerStats(validTill,
consumerStatsResponse.msgrateout(),
consumerStatsResponse.msgthroughputout(),
consumerStatsResponse.msgrateredeliver(),
consumerStatsResponse.consumername(),
consumerStatsResponse.availablepermits(),
consumerStatsResponse.unackedmessages(),
consumerStatsResponse.blockedconsumeronunackedmsgs(),
consumerStatsResponse.address(),
consumerStatsResponse.connectedsince(),
consumerStatsResponse.type(),
consumerStatsResponse.msgrateexpired(),
consumerStatsResponse.msgbacklog());
consumerStatsPromise.setValue(brokerStats);
}
} else {
LOG_WARN(
"ConsumerStatsResponse command - Received unknown request id from server: "
<< consumerStatsResponse.request_id());
}
break;
}
case BaseCommand::LOOKUP_RESPONSE: {
const CommandLookupTopicResponse& lookupTopicResponse = incomingCmd_.lookuptopicresponse();
LOG_DEBUG(
cnxString_ << "Received lookup response from server. req_id: " << lookupTopicResponse.request_id());
Lock lock(mutex_);
PendingLookupRequestsMap::iterator it = pendingLookupRequests_.find(
lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
LookupDataResultPromisePtr lookupDataPromise = it->second;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock();
if (!lookupTopicResponse.has_response()
|| (lookupTopicResponse.response()
== CommandLookupTopicResponse::Failed)) {
if (lookupTopicResponse.has_error()) {
LOG_ERROR(
cnxString_ << "Failed lookup req_id: " << lookupTopicResponse.request_id() << " error: " << lookupTopicResponse.error());
} else {
LOG_ERROR(
cnxString_ << "Failed lookup req_id: " << lookupTopicResponse.request_id() << " with empty response: ");
}
lookupDataPromise -> setFailed(ResultConnectError);
} else {
LOG_DEBUG(
cnxString_ << "Received lookup response from server. req_id: " << lookupTopicResponse.request_id() //
<< " -- broker-url: " << lookupTopicResponse.brokerserviceurl() << " -- broker-tls-url: "//
<< lookupTopicResponse.brokerserviceurltls() << " authoritative: " << lookupTopicResponse.authoritative()//
<< " redirect: " << lookupTopicResponse.response());
LookupDataResultPtr lookupResultPtr =
boost::make_shared<LookupDataResult>();
if (tlsSocket_) {
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurltls());
} else {
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl());
}
lookupResultPtr->setBrokerUrlSsl(
lookupTopicResponse.brokerserviceurltls());
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
lookupResultPtr->setRedirect(
lookupTopicResponse.response()
== CommandLookupTopicResponse::Redirect);
lookupDataPromise->setValue(lookupResultPtr);
}
} else {
LOG_WARN(
"Received unknown request id from server: " << lookupTopicResponse.request_id());
}
break;
}
case BaseCommand::PRODUCER_SUCCESS: {
const CommandProducerSuccess& producerSuccess = incomingCmd_.producer_success();
LOG_DEBUG(
cnxString_ << "Received success producer response from server. req_id: " << producerSuccess.request_id() //
<< " -- producer name: " << producerSuccess.producer_name());
Lock lock(mutex_);
PendingRequestsMap::iterator it = pendingRequests_.find(
producerSuccess.request_id());
if (it != pendingRequests_.end()) {
PendingRequestData requestData = it->second;
pendingRequests_.erase(it);
lock.unlock();
requestData.promise.setValue(producerSuccess.producer_name());
requestData.timer->cancel();
}
break;
}
case BaseCommand::ERROR: {
const CommandError& error = incomingCmd_.error();
Result result = getResult(error.error());
LOG_WARN(
cnxString_ << "Received error response from server: " << result << " -- req_id: "<< error.request_id());
Lock lock(mutex_);
PendingRequestsMap::iterator it = pendingRequests_.find(error.request_id());
if (it != pendingRequests_.end()) {
PendingRequestData requestData = it->second;
pendingRequests_.erase(it);
lock.unlock();
requestData.promise.setFailed(getResult(error.error()));
requestData.timer->cancel();
} else {
lock.unlock();
}
break;
}
case BaseCommand::CLOSE_PRODUCER: {
const CommandCloseProducer& closeProducer = incomingCmd_.close_producer();
int producerId = closeProducer.producer_id();
LOG_DEBUG("Broker notification of Closed producer: " << producerId);
Lock lock(mutex_);
ProducersMap::iterator it = producers_.find(producerId);
if (it != producers_.end()) {
ProducerImplPtr producer = it->second.lock();
lock.unlock();
if (producer) {
producer->disconnectProducer();
}
} else {
LOG_ERROR(
cnxString_ << "Got invalid producer Id in closeProducer command: "<< producerId);
}
break;
}
case BaseCommand::CLOSE_CONSUMER: {
const CommandCloseConsumer& closeconsumer = incomingCmd_.close_consumer();
int consumerId = closeconsumer.consumer_id();
LOG_DEBUG("Broker notification of Closed consumer: " << consumerId);
Lock lock(mutex_);
ConsumersMap::iterator it = consumers_.find(consumerId);
if (it != consumers_.end()) {
ConsumerImplPtr consumer = it->second.lock();
lock.unlock();
if (consumer) {
consumer->disconnectConsumer();
}
} else {
LOG_ERROR(
cnxString_ << "Got invalid consumer Id in closeConsumer command: "<< consumerId);
}
break;
}
case BaseCommand::PING: {
// Respond to ping request
LOG_DEBUG(cnxString_ << "Replying to ping command");
sendCommand(Commands::newPong());
break;
}
case BaseCommand::PONG: {
LOG_DEBUG(cnxString_ << "Received response to ping message");
havePendingPingRequest_ = false;
break;
}
default: {
LOG_WARN(cnxString_ << "Received invalid message from server");
close();
break;
}
}
}
}
}
Future<Result, BrokerConsumerStats>
ClientConnection::newConsumerStats(const std::string topicName, const std::string subscriptionName,
uint64_t consumerId, uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, BrokerConsumerStats> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
}
pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise));
lock.unlock();
sendCommand(Commands::newConsumerStats(outgoingCmd_, topicName, subscriptionName, consumerId, requestId));
return promise.getFuture();
}
void ClientConnection::newTopicLookup(const std::string& destinationName, bool authoritative,
const uint64_t requestId,
LookupDataResultPromisePtr promise) {
newLookup(Commands::newLookup(outgoingCmd_, destinationName, authoritative, requestId),
requestId, promise);
}
void ClientConnection::newPartitionedMetadataLookup(const std::string& destinationName,
const uint64_t requestId,
LookupDataResultPromisePtr promise) {
newLookup(Commands::newPartitionMetadataRequest(outgoingCmd_, destinationName, requestId), requestId,
promise);
}
void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t requestId,
LookupDataResultPromisePtr promise) {
Lock lock(mutex_);
boost::shared_ptr<LookupDataResultPtr> lookupDataResult;
lookupDataResult = boost::make_shared<LookupDataResultPtr>();
if (isClosed()) {
lock.unlock();
promise->setFailed(ResultNotConnected);
return;
} else if (numOfPendingLookupRequest_ >= maxPendingLookupRequest_) {
lock.unlock();
promise->setFailed(ResultTooManyLookupRequestException);
return;
}
pendingLookupRequests_.insert(std::make_pair(requestId, promise));
numOfPendingLookupRequest_++;
lock.unlock();
sendCommand(cmd);
}
void ClientConnection::sendCommand(const SharedBuffer& cmd) {
Lock lock(mutex_);
if (pendingWriteOperations_++ == 0) {
// Write immediately to socket
asyncWrite(
cmd.const_asio_buffer(),
customAllocWriteHandler(
boost::bind(&ClientConnection::handleSend, shared_from_this(), _1, cmd)));
} else {
// Queue to send later
pendingWriteBuffers_.push_back(cmd);
}
}
void ClientConnection::sendMessage(const OpSendMsg& opSend) {
Lock lock(mutex_);
if (pendingWriteOperations_++ == 0) {
PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_,
opSend.producerId_, opSend.sequenceId_,
getChecksumType(), opSend.msg_);
// Write immediately to socket
asyncWrite(
buffer,
customAllocWriteHandler(
boost::bind(&ClientConnection::handleSendPair, shared_from_this(), _1)));
} else {
// Queue to send later
pendingWriteBuffers_.push_back(opSend);
}
}
void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) {
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
close();
} else {
sendPendingCommands();
}
}
void ClientConnection::handleSendPair(const boost::system::error_code& err) {
if (err) {
LOG_WARN(cnxString_ << "Could not send pair message on connection: " << err << " " << err.message());
close();
} else {
sendPendingCommands();
}
}
void ClientConnection::sendPendingCommands() {
Lock lock(mutex_);
if (--pendingWriteOperations_ > 0) {
assert(!pendingWriteBuffers_.empty());
boost::any any = pendingWriteBuffers_.front();
pendingWriteBuffers_.pop_front();
if (any.type() == typeid(SharedBuffer)) {
SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
asyncWrite(
buffer.const_asio_buffer(),
customAllocWriteHandler(
boost::bind(&ClientConnection::handleSend, shared_from_this(), _1,
buffer)));
} else {
assert(any.type() == typeid(OpSendMsg));
const OpSendMsg& op = boost::any_cast<const OpSendMsg&>(any);
PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_,
op.producerId_, op.sequenceId_,
getChecksumType(), op.msg_);
asyncWrite(
buffer,
customAllocWriteHandler(
boost::bind(&ClientConnection::handleSendPair, shared_from_this(),
_1)));
}
} else {
// No more pending writes
outgoingBuffer_.reset();
}
}
Future<Result, std::string> ClientConnection::sendRequestWithId(SharedBuffer cmd, int requestId) {
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
Promise<Result, std::string> promise;
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
PendingRequestData requestData;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
requestData.timer->async_wait(
boost::bind(&ClientConnection::handleRequestTimeout, shared_from_this(), _1,
requestData));
pendingRequests_.insert(std::make_pair(requestId, requestData));
lock.unlock();
sendCommand(cmd);
return requestData.promise.getFuture();
}
void ClientConnection::handleRequestTimeout(const boost::system::error_code& ec,
PendingRequestData pendingRequestData) {
if (!ec) {
pendingRequestData.promise.setFailed(ResultTimeout);
}
}
void ClientConnection::handleKeepAliveTimeout() {
if (isClosed()) {
return;
}
if (havePendingPingRequest_) {
LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive timeout");
close();
} else {
// Send keep alive probe to peer
LOG_DEBUG(cnxString_ << "Sending ping message");
havePendingPingRequest_ = true;
sendCommand(Commands::newPing());
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
keepAliveTimer_->async_wait(
boost::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
}
}
void ClientConnection::handleConsumerStatsTimeout(const boost::system::error_code& ec,
std::vector<uint64_t> consumerStatsRequests) {
if (ec) {
LOG_DEBUG(cnxString_ << " Ignoring timer cancelled event, code[" << ec << "]");
return;
}
startConsumerStatsTimer(consumerStatsRequests);
}
void ClientConnection::close() {
Lock lock(mutex_);
state_ = Disconnected;
boost::system::error_code err;
socket_->close(err);
lock.unlock();
LOG_INFO(cnxString_ << "Connection closed");
if (keepAliveTimer_) {
keepAliveTimer_->cancel();
}
if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->cancel();
}
for (ProducersMap::iterator it = producers_.begin(); it != producers_.end(); ++it) {
HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
}
for (ConsumersMap::iterator it = consumers_.begin(); it != consumers_.end(); ++it) {
HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
}
connectPromise_.setFailed(ResultConnectError);
// Fail all pending operations on the connection
for (PendingRequestsMap::iterator it = pendingRequests_.begin(); it != pendingRequests_.end(); ++it ) {
it->second.promise.setFailed(ResultConnectError);
}
// Fail all pending lookup-requests on the connection
lock.lock();
PendingLookupRequestsMap pendingLookupRequests;
pendingLookupRequests_.swap(pendingLookupRequests);
numOfPendingLookupRequest_ -= pendingLookupRequests.size();
PendingConsumerStatsMap pendingConsumerStatsMap;
pendingConsumerStatsMap_.swap(pendingConsumerStatsMap);
lock.unlock();
for (PendingLookupRequestsMap::iterator it = pendingLookupRequests.begin(); it != pendingLookupRequests.end(); ++it) {
it->second->setFailed(ResultConnectError);
}
for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap.begin();
it != pendingConsumerStatsMap.end(); ++it) {
LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
it->second.setFailed(ResultConnectError);
}
if (tlsSocket_) {
tlsSocket_->lowest_layer().close();
}
}
bool ClientConnection::isClosed() const {
return state_ == Disconnected;
}
Future<Result, ClientConnectionWeakPtr> ClientConnection::getConnectFuture() {
return connectPromise_.getFuture();
}
void ClientConnection::registerProducer(int producerId, ProducerImplPtr producer) {
Lock lock(mutex_);
producers_.insert(std::make_pair(producerId, producer));
}
void ClientConnection::registerConsumer(int consumerId, ConsumerImplPtr consumer) {
Lock lock(mutex_);
consumers_.insert(std::make_pair(consumerId, consumer));
}
void ClientConnection::removeProducer(int producerId) {
Lock lock(mutex_);
producers_.erase(producerId);
}
void ClientConnection::removeConsumer(int consumerId) {
Lock lock(mutex_);
consumers_.erase(consumerId);
}
const std::string& ClientConnection::brokerAddress() const {
return address_;
}
const std::string& ClientConnection::cnxString() const {
return cnxString_;
}
int ClientConnection::getServerProtocolVersion() const {
return serverProtocolVersion_;
}
Commands::ChecksumType ClientConnection::getChecksumType() const {
return getServerProtocolVersion() >= proto::v6 ?
Commands::Crc32c : Commands::None;
}
}