blob: 3225dee3daf29abf41fd0a42dc8db92650c94552 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ProducerImpl.h"
#include "LogUtils.h"
#include "MessageImpl.h"
#include "PulsarApi.pb.h"
#include "Commands.h"
#include "DestinationName.h"
#include "BatchMessageContainer.h"
#include <boost/bind.hpp>
#include <boost/date_time/local_time/local_time.hpp>
using namespace pulsar;
namespace pulsar {
DECLARE_LOG_OBJECT()
OpSendMsg ::OpSendMsg()
: msg_(),
sendCallback_(),
producerId_(),
sequenceId_(),
timeout_() {
}
OpSendMsg::OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& msg,
const SendCallback& sendCallback, const ProducerConfiguration& conf)
: msg_(msg),
sendCallback_(sendCallback),
producerId_(producerId),
sequenceId_(sequenceId),
timeout_(now() + milliseconds(conf.getSendTimeout())) {
}
ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic,
const ProducerConfiguration& producerConfiguration)
: HandlerBase(client, topic),
conf_(producerConfiguration),
executor_(client->getIOExecutorProvider()->get()),
pendingMessagesQueue_(conf_.getMaxPendingMessages()),
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
sendTimer_() {
LOG_DEBUG(
"ProducerName - " << producerName_ << " Created producer on topic " << topic_ << " id: " << producerId_);
// boost::ref is used to drop the constantness constraint of make_shared
if(conf_.getBatchingEnabled()) {
batchMessageContainer = boost::make_shared<BatchMessageContainer>(boost::ref(*this));
}
numOfMsgPublished = 0;
numOfSendAsyncCalls = 0;
numOfMsgAckSuccessfully = 0;
}
ProducerImpl::~ProducerImpl() {
LOG_DEBUG(getName() << "~ProducerImpl");
closeAsync(ResultCallback());
printStats();
}
const std::string& ProducerImpl::getTopic() const {
return topic_;
}
void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lock(mutex_);
if (state_ == Closed) {
lock.unlock();
LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
return;
}
lock.unlock();
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId);
cnx->sendRequestWithId(cmd, requestId).addListener(
boost::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, _1, _2));
}
void ProducerImpl::connectionFailed(Result result) {
// Keep a reference to ensure object is kept alive
ProducerImplPtr ptr = shared_from_this();
if (producerCreatedPromise_.setFailed(result)) {
Lock lock(mutex_);
state_ = Failed;
}
}
void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const std::string& producerName) {
LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));
if (result == ResultOk) {
// We are now reconnected to broker and clear to send messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent immediately
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
Lock lock(mutex_);
cnx->registerProducer(producerId_, shared_from_this());
producerName_ = producerName;
producerStr_ = "[" + topic_ + ", " + producerName_ + "] ";
if (batchMessageContainer) {
batchMessageContainer->producerName_ = producerName_;
}
resendMessages(cnx);
connection_ = cnx;
state_ = Ready;
backoff_.reset();
lock.unlock();
// Initialize the sendTimer only once per producer and only when producer timeout is
// configured. Set the timeout as configured value and asynchronously wait for the
// timeout to happen.
if (!sendTimer_ && conf_.getSendTimeout() > 0) {
sendTimer_ = executor_->createDeadlineTimer();
sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
sendTimer_->async_wait(
boost::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), _1));
}
producerCreatedPromise_.setValue(shared_from_this());
} else {
// Producer creation failed
if (result == ResultTimeout) {
// Creating the producer has timed out. We need to ensure the broker closes the producer
// in case it was indeed created, otherwise it might prevent new create producer operation,
// since we are not closing the connection
int requestId = client_.lock()->newRequestId();
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
}
if (producerCreatedPromise_.isComplete()) {
if (result == ResultProducerBlockedQuotaExceededException) {
LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
failPendingMessages(ResultProducerBlockedQuotaExceededException);
} else if (result == ResultProducerBlockedQuotaExceededError) {
LOG_WARN(
getName() << "Producer is blocked on creation because backlog is exceeded on topic");
}
// Producer had already been initially created, we need to retry connecting in any case
LOG_WARN(getName() << "Failed to reconnect producer: " << strResult(result));
scheduleReconnection(shared_from_this());
} else {
// Producer was not yet created, retry to connect to broker if it's possible
if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < now())) {
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result));
scheduleReconnection(shared_from_this());
} else {
LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
producerCreatedPromise_.setFailed(result);
Lock lock(mutex_);
state_ = Failed;
}
}
}
}
void ProducerImpl::failPendingMessages(Result result) {
std::vector<OpSendMsg> messagesToFail;
Lock lock(mutex_);
messagesToFail.reserve(pendingMessagesQueue_.size());
LOG_DEBUG(getName() << "# messages in pending queue : " << pendingMessagesQueue_.size());
// Iterate over a copy of the pending messages queue, to trigger the future completion
// without holding producer mutex.
for (MessageQueue::const_iterator it = pendingMessagesQueue_.begin();
it != pendingMessagesQueue_.end(); it++) {
messagesToFail.push_back(*it);
}
BatchMessageContainer::MessageContainerListPtr messageContainerListPtr;
if (batchMessageContainer) {
messageContainerListPtr = batchMessageContainer->messagesContainerListPtr_;
batchMessageContainer->clear();
}
pendingMessagesQueue_.clear();
lock.unlock();
for (std::vector<OpSendMsg>::const_iterator it = messagesToFail.begin();
it != messagesToFail.end(); it++) {
it->sendCallback_(result, it->msg_);
}
// this function can handle null pointer
BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr);
}
void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
if (pendingMessagesQueue_.empty()) {
return;
}
LOG_DEBUG(getName() << "Re-Sending " << pendingMessagesQueue_.size() << " messages to server");
for (MessageQueue::const_iterator it = pendingMessagesQueue_.begin();
it != pendingMessagesQueue_.end(); ++it) {
LOG_DEBUG(getName() << "Re-Sending " << it->sequenceId_);
cnx->sendMessage(*it);
}
}
void ProducerImpl::setMessageMetadata(const Message &msg, const uint64_t& sequenceId, const uint32_t& uncompressedSize) {
// Call this function after acquiring the mutex_
proto::MessageMetadata& msgMetadata = msg.impl_->metadata;
if (! batchMessageContainer) {
msgMetadata.set_producer_name(producerName_);
}
msgMetadata.set_publish_time(currentTimeMillis());
msgMetadata.set_sequence_id(sequenceId);
if (conf_.getCompressionType() != CompressionNone) {
msgMetadata.set_compression(
CompressionCodecProvider::convertType(conf_.getCompressionType()));
msgMetadata.set_uncompressed_size(uncompressedSize);
}
}
void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
if (msg.getLength() > Commands::MaxMessageSize) {
callback(ResultMessageTooBig, msg);
return;
}
// Reserve a spot in the messages queue before acquiring the ProducerImpl
// mutex. When the queue is full, this call will block until a spot is
// available.
if (conf_.getBlockIfQueueFull()) {
pendingMessagesQueue_.reserve(1);
}
// Compress the payload if required
SharedBuffer& payload = msg.impl_->payload;
uint32_t uncompressedSize = payload.readableBytes();
if (! batchMessageContainer) {
// If batching is enabled we compress all the payloads together before sending the batch
payload = CompressionCodecProvider::getCodec(conf_.getCompressionType()).encode(payload);
}
Lock lock(mutex_);
numOfSendAsyncCalls++;
if (state_ != Ready) {
lock.unlock();
if (conf_.getBlockIfQueueFull()) {
pendingMessagesQueue_.release(1);
}
callback(ResultAlreadyClosed, msg);
return;
}
if (msg.impl_->metadata.has_producer_name()) {
// Message had already been sent before
lock.unlock();
if (conf_.getBlockIfQueueFull()) {
pendingMessagesQueue_.release(1);
}
callback(ResultInvalidMessage, msg);
return;
}
setMessageMetadata(msg, msgSequenceGenerator_++, uncompressedSize);
// reserving a spot and going forward - not blocking
if (!conf_.getBlockIfQueueFull() && !pendingMessagesQueue_.tryReserve(1)) {
LOG_DEBUG(getName() << " - Producer Queue is full");
// If queue is full sending the batch immediately, no point waiting till batchMessageimeout
if (batchMessageContainer) {
LOG_DEBUG(getName() << " - sending batch message immediately");
batchMessageContainer->sendMessage();
}
lock.unlock();
callback(ResultProducerQueueIsFull, msg);
return;
}
// If we reach this point then you have a reserved spot on the queue
if (batchMessageContainer) { // Batching is enabled
batchMessageContainer->add(msg, callback);
return;
}
sendMessage(msg, callback);
}
// Precondition -
// a. we have a reserved spot on the queue
// b. call this function after acquiring the ProducerImpl mutex_
void ProducerImpl::sendMessage(const Message& msg, SendCallback callback) {
const uint64_t& sequenceId = msg.impl_->metadata.sequence_id();
LOG_DEBUG(
getName() << "Sending msg: " << sequenceId << " -- queue_size: " << pendingMessagesQueue_.size());
OpSendMsg op(producerId_, sequenceId, msg, callback, conf_);
LOG_DEBUG("Inserting data to pendingMessagesQueue_");
pendingMessagesQueue_.push(op, true);
numOfMsgPublished++;
LOG_DEBUG("Completed Inserting data to pendingMessagesQueue_");
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
// If we do have a connection, the message is sent immediately, otherwise
// we'll try again once a new connection is established
LOG_DEBUG(getName() << "Sending msg immediately - seq: " << sequenceId);
cnx->sendMessage(op);
} else {
LOG_DEBUG(getName() << "Connection is not ready - seq: " << sequenceId);
}
}
void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& ec) {
if (ec) {
LOG_DEBUG(getName() << " Ignoring timer cancelled event, code[" << ec <<"]");
return;
}
LOG_DEBUG(getName() << " - Batch Message Timer expired");
Lock lock(mutex_);
batchMessageContainer->sendMessage();
}
void ProducerImpl::printStats() {
if (batchMessageContainer) {
LOG_INFO("Producer - " << producerStr_ << ", [numOfMsgPublished = " << numOfMsgPublished
<< "] [numOfMsgAckSuccessfully = " << numOfMsgAckSuccessfully
<< "] [numOfSendAsyncCalls =" << numOfSendAsyncCalls << "] [batchMessageContainer = "
<< *batchMessageContainer << "]");
} else {
LOG_INFO("Producer - " << producerStr_ << ", [numOfMsgPublished = " << numOfMsgPublished
<< "] [numOfMsgAckSuccessfully = " << numOfMsgAckSuccessfully
<< "] [numOfSendAsyncCalls =" << numOfSendAsyncCalls << "] [batching = off]");
}
}
void ProducerImpl::closeAsync(CloseCallback callback) {
Lock lock(mutex_);
if (state_ != Ready) {
lock.unlock();
if (!callback.empty()) {
callback(ResultAlreadyClosed);
}
return;
}
LOG_DEBUG(getName() << "Closing producer");
state_ = Closing;
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
lock.unlock();
if (!callback.empty()) {
callback(ResultOk);
}
return;
}
// Detach the producer from the connection to avoid sending any other
// message from the producer
connection_.reset();
lock.unlock();
ClientImplPtr client = client_.lock();
if (!client) {
// Client was already destroyed
if (!callback.empty()) {
callback(ResultOk);
}
return;
}
int requestId = client->newRequestId();
Future<Result, std::string> future = cnx->sendRequestWithId(
Commands::newCloseProducer(producerId_, requestId), requestId);
if (!callback.empty()) {
future.addListener(
boost::bind(&ProducerImpl::handleClose, shared_from_this(), _1, callback));
}
}
void ProducerImpl::handleClose(Result result, ResultCallback callback) {
if (result == ResultOk) {
Lock lock(mutex_);
state_ = Closed;
LOG_INFO(getName() << "Closed producer");
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
cnx->removeProducer(producerId_);
}
} else {
LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
}
callback(result);
}
Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture() {
return producerCreatedPromise_.getFuture();
}
uint64_t ProducerImpl::getProducerId() const {
return producerId_;
}
void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
if (err == boost::asio::error::operation_aborted) {
LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
return;
} else if (err) {
LOG_ERROR(getName() << "Timer error: " << err.message());
return;
}
OpSendMsg msg;
if (!pendingMessagesQueue_.peek(msg)) {
// If there are no pending messages, reset the timeout to the configured value.
sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
LOG_DEBUG(getName() << "Producer timeout triggered on empty pending message queue");
} else {
// If there is at least one message, calculate the diff between the message timeout and
// the current time.
time_duration diff = msg.timeout_ - now();
if (diff.total_milliseconds() <= 0) {
// The diff is less than or equal to zero, meaning that the message has been expired.
LOG_DEBUG(getName() << "Timer expired. Calling timeout callbacks.");
failPendingMessages(ResultTimeout);
// Since the pending queue is cleared now, set timer to expire after configured value.
sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
} else {
// The diff is greater than zero, set the timeout to the diff value
LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new timeout " << diff);
sendTimer_->expires_from_now(diff);
}
}
// Asynchronously wait for the timeout to trigger
sendTimer_->async_wait(boost::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), _1));
}
bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
OpSendMsg op;
Lock lock(mutex_);
bool havePendingAck = pendingMessagesQueue_.peek(op);
if (!havePendingAck) {
LOG_DEBUG(getName() << " -- SequenceId - " << sequenceId << "]" //
<< "Got send failure for expired message, ignoring it.");
return true;
}
uint64_t expectedSequenceId = op.sequenceId_;
if (sequenceId > expectedSequenceId) {
LOG_WARN(getName() << "Got ack failure for msg " << sequenceId //
<< " expecting: " << expectedSequenceId << " queue size="//
<< pendingMessagesQueue_.size() << " producer: " << producerId_);
return false;
} else if (sequenceId < expectedSequenceId) {
LOG_DEBUG(getName() << "Corrupt message is already timed out. Ignoring msg " << sequenceId);
return true;
} else {
LOG_DEBUG(getName() << "Remove corrupt message from queue " << sequenceId);
pendingMessagesQueue_.pop();
numOfMsgAckSuccessfully++;
if (op.msg_.impl_->metadata.has_num_messages_in_batch()) {
// batch message - need to release more spots
// -1 since the pushing batch message into the queue already released a spot
pendingMessagesQueue_.release(op.msg_.impl_->metadata.num_messages_in_batch() - 1);
}
lock.unlock();
if (op.sendCallback_) {
// to protect from client callback exception
try {
op.sendCallback_(ResultChecksumError, op.msg_);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
}
return true;
}
}
bool ProducerImpl::ackReceived(uint64_t sequenceId) {
OpSendMsg op;
Lock lock(mutex_);
bool havePendingAck = pendingMessagesQueue_.peek(op);
if (!havePendingAck) {
LOG_DEBUG(getName() << " -- SequenceId - " << sequenceId << "]" //
<< "Got an SEND_ACK for expired message, ignoring it.");
return true;
}
uint64_t expectedSequenceId = op.sequenceId_;
if (sequenceId > expectedSequenceId) {
LOG_WARN(getName() << "Got ack for msg " << sequenceId //
<< " expecting: " << expectedSequenceId << " queue size="//
<< pendingMessagesQueue_.size() << " producer: " << producerId_);
return false;
} else if (sequenceId < expectedSequenceId) {
// Ignoring the ack since it's referring to a message that has already timed out.
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId //
<< " last-seq: " << expectedSequenceId << " producer: " << producerId_);
return true;
} else {
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
pendingMessagesQueue_.pop();
numOfMsgAckSuccessfully++;
if (op.msg_.impl_->metadata.has_num_messages_in_batch()) {
// batch message - need to release more spots
// -1 since the pushing batch message into the queue already released a spot
pendingMessagesQueue_.release(op.msg_.impl_->metadata.num_messages_in_batch() - 1);
}
lock.unlock();
if (op.sendCallback_) {
try {
op.sendCallback_(ResultOk, op.msg_);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
}
return true;
}
}
void ProducerImpl::disconnectProducer() {
LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
connection_.reset();
scheduleReconnection(shared_from_this());
}
const std::string& ProducerImpl::getName() const{
return producerStr_;
}
void ProducerImpl::start() {
HandlerBase::start();
}
void ProducerImpl::shutdown() {
Lock lock(mutex_);
state_ = Closed;
sendTimer_.reset();
producerCreatedPromise_.setFailed(ResultAlreadyClosed);
}
bool ProducerImplCmp::operator()(const ProducerImplPtr &a, const ProducerImplPtr &b) const {
return a->getProducerId() < b->getProducerId();
}
bool ProducerImpl::isClosed() {
Lock lock(mutex_);
return state_ == Closed;
}
}
/* namespace pulsar */