blob: fcd02b5a445f0ff518d473fe89da23b642b9d368 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "ProducerImpl.h"
#include "LogUtils.h"
#include "MessageImpl.h"
#include "TimeUtils.h"
#include "PulsarApi.pb.h"
#include "Commands.h"
#include "BatchMessageContainerBase.h"
#include "BatchMessageContainer.h"
#include "BatchMessageKeyBasedContainer.h"
#include <boost/date_time/local_time/local_time.hpp>
#include <lib/TopicName.h>
#include "MessageAndCallbackBatch.h"
namespace pulsar {
DECLARE_LOG_OBJECT()
struct ProducerImpl::PendingCallbacks {
std::vector<OpSendMsg> opSendMsgs;
void complete(Result result) {
for (const auto& opSendMsg : opSendMsgs) {
opSendMsg.sendCallback_(result, opSendMsg.msg_.getMessageId());
}
}
};
ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf,
int32_t partition)
: HandlerBase(
client, topic,
Backoff(milliseconds(100), seconds(60), milliseconds(std::max(100, conf.getSendTimeout() - 100)))),
conf_(conf),
executor_(client->getIOExecutorProvider()->get()),
pendingMessagesQueue_(conf_.getMaxPendingMessages()),
partition_(partition),
producerName_(conf_.getProducerName()),
userProvidedProducerName_(false),
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
dataKeyGenIntervalSec_(4 * 60 * 60) {
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
<< " id: " << producerId_);
int64_t initialSequenceId = conf.getInitialSequenceId();
lastSequenceIdPublished_ = initialSequenceId;
msgSequenceGenerator_ = initialSequenceId + 1;
if (!producerName_.empty()) {
userProvidedProducerName_ = true;
}
unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
if (statsIntervalInSeconds) {
producerStatsBasePtr_ =
std::make_shared<ProducerStatsImpl>(producerStr_, executor_, statsIntervalInSeconds);
} else {
producerStatsBasePtr_ = std::make_shared<ProducerStatsDisabled>();
}
if (conf_.isEncryptionEnabled()) {
std::ostringstream logCtxStream;
logCtxStream << "[" << topic_ << ", " << producerName_ << ", " << producerId_ << "]";
std::string logCtx = logCtxStream.str();
msgCrypto_ = std::make_shared<MessageCrypto>(logCtx, true);
msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
}
if (conf_.getBatchingEnabled()) {
switch (conf_.getBatchingType()) {
case ProducerConfiguration::DefaultBatching:
batchMessageContainer_.reset(new BatchMessageContainer(*this));
break;
case ProducerConfiguration::KeyBasedBatching:
batchMessageContainer_.reset(new BatchMessageKeyBasedContainer(*this));
break;
default: // never reached here
LOG_ERROR("Unknown batching type: " << conf_.getBatchingType());
return;
}
batchTimer_ = executor_->createDeadlineTimer();
}
}
ProducerImpl::~ProducerImpl() {
LOG_DEBUG(getName() << "~ProducerImpl");
cancelTimers();
printStats();
if (state_ == Ready) {
LOG_WARN(getName() << "Destroyed producer which was not properly closed");
}
}
const std::string& ProducerImpl::getTopic() const { return topic_; }
const std::string& ProducerImpl::getProducerName() const { return producerName_; }
int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished_; }
const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; }
void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
if (ec) {
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
return;
}
msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
dataKeyGenTImer_->async_wait(
std::bind(&pulsar::ProducerImpl::refreshEncryptionKey, shared_from_this(), std::placeholders::_1));
}
void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lock(mutex_);
if (state_ == Closed) {
lock.unlock();
LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
return;
}
lock.unlock();
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId,
conf_.getProperties(), conf_.getSchema(), epoch_,
userProvidedProducerName_, conf_.isEncryptionEnabled());
cnx->sendRequestWithId(cmd, requestId)
.addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx,
std::placeholders::_1, std::placeholders::_2));
}
void ProducerImpl::connectionFailed(Result result) {
// Keep a reference to ensure object is kept alive
ProducerImplPtr ptr = shared_from_this();
if (producerCreatedPromise_.setFailed(result)) {
Lock lock(mutex_);
state_ = Failed;
}
}
void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData) {
LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));
if (result == ResultOk) {
// We are now reconnected to broker and clear to send messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent immediately
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
Lock lock(mutex_);
cnx->registerProducer(producerId_, shared_from_this());
producerName_ = responseData.producerName;
schemaVersion_ = responseData.schemaVersion;
producerStr_ = "[" + topic_ + ", " + producerName_ + "] ";
if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
lastSequenceIdPublished_ = responseData.lastSequenceId;
msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
}
resendMessages(cnx);
connection_ = cnx;
state_ = Ready;
backoff_.reset();
lock.unlock();
if (!dataKeyGenTImer_ && conf_.isEncryptionEnabled()) {
dataKeyGenTImer_ = executor_->createDeadlineTimer();
dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
dataKeyGenTImer_->async_wait(std::bind(&pulsar::ProducerImpl::refreshEncryptionKey,
shared_from_this(), std::placeholders::_1));
}
// Initialize the sendTimer only once per producer and only when producer timeout is
// configured. Set the timeout as configured value and asynchronously wait for the
// timeout to happen.
if (!sendTimer_ && conf_.getSendTimeout() > 0) {
sendTimer_ = executor_->createDeadlineTimer();
sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
sendTimer_->async_wait(
std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1));
}
producerCreatedPromise_.setValue(shared_from_this());
} else {
// Producer creation failed
if (result == ResultTimeout) {
// Creating the producer has timed out. We need to ensure the broker closes the producer
// in case it was indeed created, otherwise it might prevent new create producer operation,
// since we are not closing the connection
int requestId = client_.lock()->newRequestId();
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
}
if (producerCreatedPromise_.isComplete()) {
if (result == ResultProducerBlockedQuotaExceededException) {
LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
failPendingMessages(ResultProducerBlockedQuotaExceededException);
} else if (result == ResultProducerBlockedQuotaExceededError) {
LOG_WARN(getName() << "Producer is blocked on creation because backlog is exceeded on topic");
}
// Producer had already been initially created, we need to retry connecting in any case
LOG_WARN(getName() << "Failed to reconnect producer: " << strResult(result));
scheduleReconnection(shared_from_this());
} else {
// Producer was not yet created, retry to connect to broker if it's possible
if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) {
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result));
scheduleReconnection(shared_from_this());
} else {
LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
producerCreatedPromise_.setFailed(result);
Lock lock(mutex_);
state_ = Failed;
}
}
}
}
std::shared_ptr<ProducerImpl::PendingCallbacks> ProducerImpl::getPendingCallbacksWhenFailed() {
auto callbacks = std::make_shared<PendingCallbacks>();
callbacks->opSendMsgs.reserve(pendingMessagesQueue_.size());
LOG_DEBUG(getName() << "# messages in pending queue : " << pendingMessagesQueue_.size());
// Iterate over a copy of the pending messages queue, to trigger the future completion
// without holding producer mutex.
for (MessageQueue::const_iterator it = pendingMessagesQueue_.begin(); it != pendingMessagesQueue_.end();
it++) {
callbacks->opSendMsgs.push_back(*it);
}
if (batchMessageContainer_) {
OpSendMsg opSendMsg;
if (batchMessageContainer_->createOpSendMsg(opSendMsg) == ResultOk) {
callbacks->opSendMsgs.emplace_back(opSendMsg);
}
batchMessageContainer_->clear();
}
pendingMessagesQueue_.clear();
return callbacks;
}
std::shared_ptr<ProducerImpl::PendingCallbacks> ProducerImpl::getPendingCallbacksWhenFailedWithLock() {
Lock lock(mutex_);
return getPendingCallbacksWhenFailed();
}
void ProducerImpl::failPendingMessages(Result result) {
getPendingCallbacksWhenFailedWithLock()->complete(result);
}
void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
if (pendingMessagesQueue_.empty()) {
return;
}
LOG_DEBUG(getName() << "Re-Sending " << pendingMessagesQueue_.size() << " messages to server");
for (MessageQueue::const_iterator it = pendingMessagesQueue_.begin(); it != pendingMessagesQueue_.end();
++it) {
LOG_DEBUG(getName() << "Re-Sending " << it->sequenceId_);
cnx->sendMessage(*it);
}
}
void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequenceId,
const uint32_t& uncompressedSize) {
// Call this function after acquiring the mutex_
proto::MessageMetadata& msgMetadata = msg.impl_->metadata;
msgMetadata.set_producer_name(producerName_);
msgMetadata.set_publish_time(TimeUtils::currentTimeMillis());
msgMetadata.set_sequence_id(sequenceId);
if (conf_.getCompressionType() != CompressionNone) {
msgMetadata.set_compression(CompressionCodecProvider::convertType(conf_.getCompressionType()));
msgMetadata.set_uncompressed_size(uncompressedSize);
}
if (!this->getSchemaVersion().empty()) {
msgMetadata.set_schema_version(this->getSchemaVersion());
}
}
void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, SendCallback callback,
boost::posix_time::ptime publishTime) {
producerStatsBasePtr_->messageReceived(res, publishTime);
if (callback) {
callback(res, msgId);
}
}
void ProducerImpl::flushAsync(FlushCallback callback) {
if (batchMessageContainer_) {
Lock lock(mutex_);
auto failures = batchMessageAndSend(callback);
lock.unlock();
failures.complete();
} else {
callback(ResultOk);
}
}
void ProducerImpl::triggerFlush() {
if (batchMessageContainer_) {
Lock lock(mutex_);
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();
}
}
void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
producerStatsBasePtr_->messageSent(msg);
SendCallback cb =
std::bind(&ProducerImpl::statsCallBackHandler, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, boost::posix_time::microsec_clock::universal_time());
// Compress the payload if required
SharedBuffer& payload = msg.impl_->payload;
uint32_t uncompressedSize = payload.readableBytes();
uint32_t payloadSize = uncompressedSize;
ClientConnectionPtr cnx = getCnx().lock();
if (!batchMessageContainer_) {
// If batching is enabled we compress all the payloads together before sending the batch
payload = CompressionCodecProvider::getCodec(conf_.getCompressionType()).encode(payload);
payloadSize = payload.readableBytes();
// Encrypt the payload if enabled
SharedBuffer encryptedPayload;
if (!encryptMessage(msg.impl_->metadata, payload, encryptedPayload)) {
cb(ResultCryptoError, msg.getMessageId());
return;
}
payload = encryptedPayload;
if (payloadSize > ClientConnection::getMaxMessageSize()) {
LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed "
<< ClientConnection::getMaxMessageSize() << " bytes");
cb(ResultMessageTooBig, msg.getMessageId());
return;
}
}
// Reserve a spot in the messages queue before acquiring the ProducerImpl
// mutex. When the queue is full, this call will block until a spot is
// available.
if (conf_.getBlockIfQueueFull()) {
pendingMessagesQueue_.reserve(1);
}
Lock lock(mutex_);
if (state_ != Ready) {
lock.unlock();
if (conf_.getBlockIfQueueFull()) {
pendingMessagesQueue_.release(1);
}
cb(ResultAlreadyClosed, msg.getMessageId());
return;
}
if (msg.impl_->metadata.has_producer_name()) {
// Message had already been sent before
lock.unlock();
if (conf_.getBlockIfQueueFull()) {
pendingMessagesQueue_.release(1);
}
cb(ResultInvalidMessage, msg.getMessageId());
return;
}
uint64_t sequenceId;
if (!msg.impl_->metadata.has_sequence_id()) {
sequenceId = msgSequenceGenerator_++;
} else {
sequenceId = msg.impl_->metadata.sequence_id();
}
setMessageMetadata(msg, sequenceId, uncompressedSize);
// reserving a spot and going forward - not blocking
if (!conf_.getBlockIfQueueFull() && !pendingMessagesQueue_.tryReserve(1)) {
LOG_DEBUG(getName() << " - Producer Queue is full");
// If queue is full sending the batch immediately, no point waiting till batchMessagetimeout
if (batchMessageContainer_) {
LOG_DEBUG(getName() << " - sending batch message immediately");
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();
} else {
lock.unlock();
}
cb(ResultProducerQueueIsFull, msg.getMessageId());
return;
}
// If we reach this point then you have a reserved spot on the queue
if (batchMessageContainer_ && !msg.impl_->metadata.has_deliver_at_time()) {
// Batching is enabled and the message is not delayed
if (!batchMessageContainer_->hasEnoughSpace(msg)) {
batchMessageAndSend().complete();
}
bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
bool isFull = batchMessageContainer_->add(msg, cb);
if (isFirstMessage) {
batchTimer_->expires_from_now(
boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
batchTimer_->async_wait(std::bind(&ProducerImpl::batchMessageTimeoutHandler, shared_from_this(),
std::placeholders::_1));
}
if (isFull) {
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();
}
} else {
sendMessage(OpSendMsg{msg, cb, producerId_, sequenceId, conf_.getSendTimeout()});
}
}
// It must be called while `mutex_` is acquired
PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCallback) {
PendingFailures failures;
LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
batchTimer_->cancel();
if (PULSAR_UNLIKELY(batchMessageContainer_->isEmpty())) {
if (flushCallback) {
flushCallback(ResultOk);
}
} else {
const size_t numBatches = batchMessageContainer_->getNumBatches();
if (numBatches == 1) {
OpSendMsg opSendMsg;
Result result = batchMessageContainer_->createOpSendMsg(opSendMsg, flushCallback);
if (result == ResultOk) {
sendMessage(opSendMsg);
} else {
// A spot has been reserved for this batch, but the batch failed to be pushed to the queue, so
// we need to release the spot manually
LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: " << result);
pendingMessagesQueue_.release(1);
failures.add(std::bind(opSendMsg.sendCallback_, result, MessageId{}));
}
} else if (numBatches > 1) {
std::vector<OpSendMsg> opSendMsgs;
std::vector<Result> results = batchMessageContainer_->createOpSendMsgs(opSendMsgs, flushCallback);
for (size_t i = 0; i < results.size(); i++) {
if (results[i] == ResultOk) {
sendMessage(opSendMsgs[i]);
} else {
// A spot has been reserved for this batch, but the batch failed to be pushed to the
// queue, so we need to release the spot manually
LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsgs[" << i
<< "]: " << results[i]);
pendingMessagesQueue_.release(1);
failures.add(std::bind(opSendMsgs[i].sendCallback_, results[i], MessageId{}));
}
}
} // else numBatches is 0, do nothing
}
batchMessageContainer_->clear();
return failures;
}
// Precondition -
// a. we have a reserved spot on the queue
// b. call this function after acquiring the ProducerImpl mutex_
void ProducerImpl::sendMessage(const OpSendMsg& op) {
const auto sequenceId = op.msg_.impl_->metadata.sequence_id();
LOG_DEBUG("Inserting data to pendingMessagesQueue_");
pendingMessagesQueue_.push(op, true);
LOG_DEBUG("Completed Inserting data to pendingMessagesQueue_");
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
// If we do have a connection, the message is sent immediately, otherwise
// we'll try again once a new connection is established
LOG_DEBUG(getName() << "Sending msg immediately - seq: " << sequenceId);
cnx->sendMessage(op);
} else {
LOG_DEBUG(getName() << "Connection is not ready - seq: " << sequenceId);
}
}
void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& ec) {
if (ec) {
LOG_DEBUG(getName() << " Ignoring timer cancelled event, code[" << ec << "]");
return;
}
LOG_DEBUG(getName() << " - Batch Message Timer expired");
Lock lock(mutex_);
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();
}
void ProducerImpl::printStats() {
if (batchMessageContainer_) {
LOG_INFO("Producer - " << producerStr_ << ", [batchMessageContainer = " << *batchMessageContainer_
<< "]");
} else {
LOG_INFO("Producer - " << producerStr_ << ", [batching = off]");
}
}
void ProducerImpl::closeAsync(CloseCallback callback) {
Lock lock(mutex_);
// Keep a reference to ensure object is kept alive
ProducerImplPtr ptr = shared_from_this();
cancelTimers();
if (state_ != Ready) {
lock.unlock();
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
LOG_INFO(getName() << "Closing producer for topic " << topic_);
state_ = Closing;
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
state_ = Closed;
lock.unlock();
if (callback) {
callback(ResultOk);
}
return;
}
// Detach the producer from the connection to avoid sending any other
// message from the producer
connection_.reset();
ClientImplPtr client = client_.lock();
if (!client) {
state_ = Closed;
lock.unlock();
// Client was already destroyed
if (callback) {
callback(ResultOk);
}
return;
}
lock.unlock();
int requestId = client->newRequestId();
Future<Result, ResponseData> future =
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
if (callback) {
// Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed
future.addListener(
std::bind(&ProducerImpl::handleClose, shared_from_this(), std::placeholders::_1, callback, ptr));
}
}
void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) {
if (result == ResultOk) {
Lock lock(mutex_);
state_ = Closed;
LOG_INFO(getName() << "Closed producer");
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
cnx->removeProducer(producerId_);
}
} else {
LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
}
if (callback) {
callback(result);
}
}
Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture() {
return producerCreatedPromise_.getFuture();
}
uint64_t ProducerImpl::getProducerId() const { return producerId_; }
void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
Lock lock(mutex_);
if (state_ != Ready) {
return;
}
if (err == boost::asio::error::operation_aborted) {
LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
return;
} else if (err) {
LOG_ERROR(getName() << "Timer error: " << err.message());
return;
}
std::shared_ptr<PendingCallbacks> pendingCallbacks;
OpSendMsg msg;
if (!pendingMessagesQueue_.peek(msg)) {
// If there are no pending messages, reset the timeout to the configured value.
sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
LOG_DEBUG(getName() << "Producer timeout triggered on empty pending message queue");
} else {
// If there is at least one message, calculate the diff between the message timeout and
// the current time.
time_duration diff = msg.timeout_ - TimeUtils::now();
if (diff.total_milliseconds() <= 0) {
// The diff is less than or equal to zero, meaning that the message has been expired.
LOG_DEBUG(getName() << "Timer expired. Calling timeout callbacks.");
pendingCallbacks = getPendingCallbacksWhenFailed();
// Since the pending queue is cleared now, set timer to expire after configured value.
sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
} else {
// The diff is greater than zero, set the timeout to the diff value
LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new timeout " << diff);
sendTimer_->expires_from_now(diff);
}
}
// Asynchronously wait for the timeout to trigger
sendTimer_->async_wait(
std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1));
lock.unlock();
if (pendingCallbacks) {
pendingCallbacks->complete(ResultTimeout);
}
}
bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
OpSendMsg op;
Lock lock(mutex_);
bool havePendingAck = pendingMessagesQueue_.peek(op);
if (!havePendingAck) {
LOG_DEBUG(getName() << " -- SequenceId - " << sequenceId << "]" //
<< "Got send failure for expired message, ignoring it.");
return true;
}
uint64_t expectedSequenceId = op.sequenceId_;
if (sequenceId > expectedSequenceId) {
LOG_WARN(getName() << "Got ack failure for msg " << sequenceId //
<< " expecting: " << expectedSequenceId << " queue size=" //
<< pendingMessagesQueue_.size() << " producer: " << producerId_);
return false;
} else if (sequenceId < expectedSequenceId) {
LOG_DEBUG(getName() << "Corrupt message is already timed out. Ignoring msg " << sequenceId);
return true;
} else {
LOG_DEBUG(getName() << "Remove corrupt message from queue " << sequenceId);
pendingMessagesQueue_.pop();
lock.unlock();
if (op.sendCallback_) {
// to protect from client callback exception
try {
op.sendCallback_(ResultChecksumError, op.msg_.getMessageId());
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
}
return true;
}
}
bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
MessageId messageId(partition_, rawMessageId.ledgerId(), rawMessageId.entryId(),
rawMessageId.batchIndex());
OpSendMsg op;
Lock lock(mutex_);
bool havePendingAck = pendingMessagesQueue_.peek(op);
if (!havePendingAck) {
LOG_DEBUG(getName() << " -- SequenceId - " << sequenceId << "]" //
<< " -- MessageId - " << messageId << "]"
<< "Got an SEND_ACK for expired message, ignoring it.");
return true;
}
uint64_t expectedSequenceId = op.sequenceId_;
if (sequenceId > expectedSequenceId) {
LOG_WARN(getName() << "Got ack for msg " << sequenceId //
<< " expecting: " << expectedSequenceId << " queue size=" //
<< pendingMessagesQueue_.size() << " producer: " << producerId_);
return false;
} else if (sequenceId < expectedSequenceId) {
// Ignoring the ack since it's referring to a message that has already timed out.
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId //
<< " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
<< " producer: " << producerId_);
return true;
} else {
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
pendingMessagesQueue_.pop();
// Release all the additional spots in the queue, since we have reserved one per message and
// we are removing just 1 batch.
pendingMessagesQueue_.release(op.num_messages_in_batch() - 1);
lastSequenceIdPublished_ = sequenceId + op.num_messages_in_batch() - 1;
lock.unlock();
if (op.sendCallback_) {
try {
op.sendCallback_(ResultOk, messageId);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
}
return true;
}
}
bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
SharedBuffer& encryptedPayload) {
if (!conf_.isEncryptionEnabled() || msgCrypto_ == NULL) {
encryptedPayload = payload;
return true;
}
return msgCrypto_->encrypt(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader(), metadata, payload,
encryptedPayload);
}
void ProducerImpl::disconnectProducer() {
LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
Lock lock(mutex_);
connection_.reset();
lock.unlock();
scheduleReconnection(shared_from_this());
}
const std::string& ProducerImpl::getName() const { return producerStr_; }
void ProducerImpl::start() { HandlerBase::start(); }
void ProducerImpl::shutdown() {
Lock lock(mutex_);
state_ = Closed;
cancelTimers();
producerCreatedPromise_.setFailed(ResultAlreadyClosed);
}
void ProducerImpl::cancelTimers() {
if (dataKeyGenTImer_) {
dataKeyGenTImer_->cancel();
dataKeyGenTImer_.reset();
}
if (batchTimer_) {
batchTimer_->cancel();
batchTimer_.reset();
}
if (sendTimer_) {
sendTimer_->cancel();
sendTimer_.reset();
}
}
bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const {
return a->getProducerId() < b->getProducerId();
}
bool ProducerImpl::isClosed() {
Lock lock(mutex_);
return state_ == Closed;
}
} // namespace pulsar
/* namespace pulsar */