[C++] PIP 37: Support large message size (#13627)
* Support configuring chunkingEnabled
* Add limitations when chunking is enabled
* [WIP] serializeAndSendMessage
* Pass TopicName instead of string to ProducerImpl's constructor
* Send messages by chunks
* Check whether the callback should be triggered in sendMessage
* Combine callback and releaseSemaphore
* Wrap the send callback with stats update
* Fix incorrect isValidProducerState
* Fix checksum error when chunks are sent
* Add chunked configs from consumer
* Support consuming chunks
* Fix incorrect concanated payload size
* Add tests for chunking messages
* Fixed tests failure when compression is enabled
* Improve logs
* Refactor chunking related fields and fix memory error
* Fix comments
* Add MapCache class
* Use MapCache to refactor ConsumerImpl
* Verify the chunk cache is cleared
* Fix chunked cache
* Fix CentOS 7 build
* Fix Ubuntu 16.04 build failure
* Fix incompatibility with GTest 1.8.0
* Fix tests
* Fix GCC 5.4 segmentation fault
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 201eba3..2cdbf47 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -437,6 +437,52 @@
*/
int getPriorityLevel() const;
+ /**
+ * Consumer buffers chunk messages into memory until it receives all the chunks of the original message.
+ * While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they
+ * might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage
+ * chunks coming from different messages. This mainly happens when multiple publishers are publishing
+ * messages on the topic concurrently or publisher failed to publish all chunks of the messages.
+ *
+ * eg: M1-C1, M2-C1, M1-C2, M2-C2
+ * Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 belong to M2 message.
+ *
+ * Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it
+ * can be guarded by providing this maxPendingChunkedMessage threshold. Once, consumer reaches this
+ * threshold, it drops the outstanding unchunked-messages by silently acking or asking broker to redeliver
+ * later by marking it unacked. See setAutoOldestChunkedMessageOnQueueFull.
+ *
+ * If it's zero, the pending chunked messages will not be limited.
+ *
+ * Default: 100
+ *
+ * @param maxPendingChunkedMessage the number of max pending chunked messages
+ */
+ ConsumerConfiguration& setMaxPendingChunkedMessage(size_t maxPendingChunkedMessage);
+
+ /**
+ * The associated getter of setMaxPendingChunkedMessage
+ */
+ size_t getMaxPendingChunkedMessage() const;
+
+ /**
+ * Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it
+ * can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage.
+ * Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking
+ * if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
+ *
+ * Default: false
+ *
+ * @param autoAckOldestChunkedMessageOnQueueFull whether to ack the discarded chunked message
+ */
+ ConsumerConfiguration& setAutoOldestChunkedMessageOnQueueFull(
+ bool autoAckOldestChunkedMessageOnQueueFull);
+
+ /**
+ * The associated getter of setAutoOldestChunkedMessageOnQueueFull
+ */
+ bool isAutoOldestChunkedMessageOnQueueFull() const;
+
friend class PulsarWrapper;
private:
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 5c2792a..7c278dd 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -473,6 +473,34 @@
*/
ProducerConfiguration& setProperties(const std::map<std::string, std::string>& properties);
+ /**
+ * If message size is higher than allowed max publish-payload size by broker then enableChunking helps
+ * producer to split message into multiple chunks and publish them to broker separately in order. So, it
+ * allows client to successfully publish large size of messages in pulsar.
+ *
+ * Set it true to enable this feature. If so, you must disable batching (see setBatchingEnabled),
+ * otherwise the producer creation will fail.
+ *
+ * There are some other recommendations when it's enabled:
+ * 1. This features is right now only supported for non-shared subscription and persistent-topic.
+ * 2. It's better to reduce setMaxPendingMessages to avoid producer accupying large amount of memory by
+ * buffered messages.
+ * 3. Set message-ttl on the namespace to cleanup chunked messages. Sometimes due to broker-restart or
+ * publish time, producer might fail to publish entire large message. So, consumer will not be able to
+ * consume and ack those messages.
+ *
+ * Default: false
+ *
+ * @param chunkingEnabled whether chunking is enabled
+ * @return the ProducerConfiguration self
+ */
+ ProducerConfiguration& setChunkingEnabled(bool chunkingEnabled);
+
+ /**
+ * The getter associated with setChunkingEnabled().
+ */
+ bool isChunkingEnabled() const;
+
friend class PulsarWrapper;
private:
diff --git a/pulsar-client-cpp/lib/BatchMessageContainerBase.cc b/pulsar-client-cpp/lib/BatchMessageContainerBase.cc
index 1201707..e9e6b98 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainerBase.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainerBase.cc
@@ -74,7 +74,8 @@
return ResultMessageTooBig;
}
- opSendMsg.msg_.impl_ = impl;
+ opSendMsg.metadata_ = impl->metadata;
+ opSendMsg.payload_ = impl->payload;
opSendMsg.sequenceId_ = impl->metadata.sequence_id();
opSendMsg.producerId_ = producerId_;
opSendMsg.timeout_ = TimeUtils::now() + milliseconds(producerConfig_.getSendTimeout());
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 3ad6f40..79bc1d7 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -1366,8 +1366,9 @@
}
void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) {
- PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_, opSend.producerId_,
- opSend.sequenceId_, getChecksumType(), opSend.msg_);
+ PairSharedBuffer buffer =
+ Commands::newSend(outgoingBuffer_, outgoingCmd_, opSend.producerId_, opSend.sequenceId_,
+ getChecksumType(), opSend.metadata_, opSend.payload_);
asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
shared_from_this(), std::placeholders::_1)));
@@ -1408,8 +1409,9 @@
assert(any.type() == typeid(OpSendMsg));
const OpSendMsg& op = boost::any_cast<const OpSendMsg&>(any);
- PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_, op.producerId_,
- op.sequenceId_, getChecksumType(), op.msg_);
+ PairSharedBuffer buffer =
+ Commands::newSend(outgoingBuffer_, outgoingCmd_, op.producerId_, op.sequenceId_,
+ getChecksumType(), op.metadata_, op.payload_);
asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
shared_from_this(), std::placeholders::_1)));
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 8cdaacc..0b07b6e 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -29,6 +29,7 @@
#include <pulsar/ConsoleLoggerFactory.h>
#include <boost/algorithm/string/predicate.hpp>
#include <sstream>
+#include <stdexcept>
#include <lib/HTTPLookupService.h>
#include <lib/TopicName.h>
#include <algorithm>
@@ -147,6 +148,9 @@
void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
CreateProducerCallback callback) {
+ if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) {
+ throw std::invalid_argument("Batching and chunking of messages can't be enabled together");
+ }
TopicNamePtr topicName;
{
Lock lock(mutex_);
@@ -174,7 +178,7 @@
producer = std::make_shared<PartitionedProducerImpl>(shared_from_this(), topicName,
partitionMetadata->getPartitions(), conf);
} else {
- producer = std::make_shared<ProducerImpl>(shared_from_this(), topicName->toString(), conf);
+ producer = std::make_shared<ProducerImpl>(shared_from_this(), *topicName, conf);
}
producer->getProducerCreatedFuture().addListener(
std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1,
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 1094efb..472c15b 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -141,10 +141,8 @@
}
PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint64_t producerId,
- uint64_t sequenceId, ChecksumType checksumType, const Message& msg) {
- const proto::MessageMetadata& metadata = msg.impl_->metadata;
- SharedBuffer& payload = msg.impl_->payload;
-
+ uint64_t sequenceId, ChecksumType checksumType,
+ const proto::MessageMetadata& metadata, const SharedBuffer& payload) {
cmd.set_type(BaseCommand::SEND);
CommandSend* send = cmd.mutable_send();
send->set_producer_id(producerId);
@@ -152,6 +150,9 @@
if (metadata.has_num_messages_in_batch()) {
send->set_num_messages(metadata.num_messages_in_batch());
}
+ if (metadata.has_chunk_id()) {
+ send->set_is_chunk(true);
+ }
// / Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
@@ -199,7 +200,8 @@
int metadataStartIndex = checksumReaderIndex + checksumSize;
uint32_t metadataChecksum =
computeChecksum(0, headers.data() + metadataStartIndex, (writeIndex - metadataStartIndex));
- uint32_t computedChecksum = computeChecksum(metadataChecksum, payload.data(), payload.writerIndex());
+ uint32_t computedChecksum =
+ computeChecksum(metadataChecksum, payload.data(), payload.readableBytes());
// set computed checksum
headers.setWriterIndex(checksumReaderIndex);
headers.writeUnsignedInt(computedChecksum);
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index e720571..bab2211 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -80,7 +80,8 @@
const std::string& listenerName);
static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId,
- uint64_t sequenceId, ChecksumType checksumType, const Message& msg);
+ uint64_t sequenceId, ChecksumType checksumType,
+ const proto::MessageMetadata& metadata, const SharedBuffer& payload);
static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId,
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index b01e4e5..d13cb0e 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -231,4 +231,21 @@
KeySharedPolicy ConsumerConfiguration::getKeySharedPolicy() const { return impl_->keySharedPolicy; }
+ConsumerConfiguration& ConsumerConfiguration::setMaxPendingChunkedMessage(size_t maxPendingChunkedMessage) {
+ impl_->maxPendingChunkedMessage = maxPendingChunkedMessage;
+ return *this;
+}
+
+size_t ConsumerConfiguration::getMaxPendingChunkedMessage() const { return impl_->maxPendingChunkedMessage; }
+
+ConsumerConfiguration& ConsumerConfiguration::setAutoOldestChunkedMessageOnQueueFull(
+ bool autoAckOldestChunkedMessageOnQueueFull) {
+ impl_->autoAckOldestChunkedMessageOnQueueFull = autoAckOldestChunkedMessageOnQueueFull;
+ return *this;
+}
+
+bool ConsumerConfiguration::isAutoOldestChunkedMessageOnQueueFull() const {
+ return impl_->autoAckOldestChunkedMessageOnQueueFull;
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 75f65a7..9c2a461 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -50,6 +50,8 @@
std::map<std::string, std::string> properties;
int priorityLevel{0};
KeySharedPolicy keySharedPolicy;
+ size_t maxPendingChunkedMessage{100};
+ bool autoAckOldestChunkedMessageOnQueueFull{false};
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 77c0fa9..4d4a135 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -63,7 +63,9 @@
negativeAcksTracker_(client, *this, conf),
ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
readCompacted_(conf.isReadCompacted()),
- lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
+ lastMessageInBroker_(Optional<MessageId>::of(MessageId())),
+ maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
+ autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoOldestChunkedMessageOnQueueFull()) {
std::stringstream consumerStrStream;
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
consumerStr_ = consumerStrStream.str();
@@ -308,6 +310,80 @@
callback(result);
}
+Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
+ const proto::MessageMetadata& metadata,
+ const MessageId& messageId,
+ const proto::MessageIdData& messageIdData,
+ const ClientConnectionPtr& cnx) {
+ const auto chunkId = metadata.chunk_id();
+ const auto uuid = metadata.uuid();
+ LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
+ << ", messageId: " << messageId << ") of "
+ << payload.readableBytes() << " bytes");
+
+ Lock lock(chunkProcessMutex_);
+ auto it = chunkedMessageCache_.find(uuid);
+
+ if (chunkId == 0) {
+ if (it == chunkedMessageCache_.end()) {
+ it = chunkedMessageCache_.putIfAbsent(
+ uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
+ }
+ if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) {
+ chunkedMessageCache_.removeOldestValues(
+ chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1,
+ [this, messageId](const std::string& uuid, const ChunkedMessageCtx& ctx) {
+ if (autoAckOldestChunkedMessageOnQueueFull_) {
+ doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) {
+ if (result != ResultOk) {
+ LOG_WARN("Failed to acknowledge discarded chunk, uuid: "
+ << uuid << ", messageId: " << messageId);
+ }
+ });
+ } else {
+ trackMessage(messageId);
+ }
+ });
+ it = chunkedMessageCache_.putIfAbsent(
+ uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
+ }
+ }
+
+ auto& chunkedMsgCtx = it->second;
+ if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) {
+ if (it == chunkedMessageCache_.end()) {
+ LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId
+ << ", messageId: " << messageId << ")");
+ } else {
+ LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: "
+ << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")");
+ chunkedMessageCache_.remove(uuid);
+ }
+ lock.unlock();
+ increaseAvailablePermits(cnx);
+ trackMessage(messageId);
+ return Optional<SharedBuffer>::empty();
+ }
+
+ chunkedMsgCtx.appendChunk(messageId, payload);
+ if (!chunkedMsgCtx.isCompleted()) {
+ lock.unlock();
+ increaseAvailablePermits(cnx);
+ return Optional<SharedBuffer>::empty();
+ }
+
+ LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
+ << ", sequenceId: " << metadata.sequence_id());
+
+ auto wholePayload = chunkedMsgCtx.getBuffer();
+ chunkedMessageCache_.remove(uuid);
+ if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
+ return Optional<SharedBuffer>::of(wholePayload);
+ } else {
+ return Optional<SharedBuffer>::empty();
+ }
+}
+
void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
bool& isChecksumValid, proto::MessageMetadata& metadata,
SharedBuffer& payload) {
@@ -318,17 +394,39 @@
return;
}
- if (!uncompressMessageIfNeeded(cnx, msg, metadata, payload)) {
- // Message was discarded on decompression error
- return;
- }
-
if (!isChecksumValid) {
// Message discarded for checksum error
discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::ChecksumMismatch);
return;
}
+ const bool isMessageDecryptable =
+ metadata.encryption_keys_size() <= 0 || config_.getCryptoKeyReader().get() ||
+ config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME;
+
+ const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1 &&
+ config_.getConsumerType() != ConsumerType::ConsumerShared &&
+ config_.getConsumerType() != ConsumerType::ConsumerKeyShared;
+ if (isMessageDecryptable && !isChunkedMessage) {
+ if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) {
+ // Message was discarded on decompression error
+ return;
+ }
+ }
+
+ // Only a non-batched messages can be a chunk
+ if (!metadata.has_num_messages_in_batch() && isChunkedMessage) {
+ const auto& messageIdData = msg.message_id();
+ MessageId messageId(messageIdData.partition(), messageIdData.ledgerid(), messageIdData.entryid(),
+ messageIdData.batch_index());
+ auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
+ if (optionalPayload.is_present()) {
+ payload = optionalPayload.value();
+ } else {
+ return;
+ }
+ }
+
Message m(msg, metadata, payload, partitionIndex_);
m.impl_->cnx_ = cnx.get();
m.impl_->setTopicName(topic_);
@@ -528,8 +626,10 @@
return false;
}
-bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
- const proto::MessageMetadata& metadata, SharedBuffer& payload) {
+bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,
+ const proto::MessageIdData& messageIdData,
+ const proto::MessageMetadata& metadata, SharedBuffer& payload,
+ bool checkMaxMessageSize) {
if (!metadata.has_compression()) {
return true;
}
@@ -539,12 +639,11 @@
uint32_t uncompressedSize = metadata.uncompressed_size();
uint32_t payloadSize = payload.readableBytes();
if (cnx) {
- if (payloadSize > ClientConnection::getMaxMessageSize()) {
+ if (checkMaxMessageSize && payloadSize > ClientConnection::getMaxMessageSize()) {
// Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize
LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize //
- << " at " << msg.message_id().ledgerid() << ":"
- << msg.message_id().entryid());
- discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::UncompressedSizeCorruption);
+ << " at " << messageIdData.ledgerid() << ":" << messageIdData.entryid());
+ discardCorruptedMessage(cnx, messageIdData, proto::CommandAck::UncompressedSizeCorruption);
return false;
}
} else {
@@ -554,8 +653,8 @@
if (!CompressionCodecProvider::getCodec(compressionType).decode(payload, uncompressedSize, payload)) {
LOG_ERROR(getName() << "Failed to decompress message with " << uncompressedSize //
- << " at " << msg.message_id().ledgerid() << ":" << msg.message_id().entryid());
- discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::DecompressionError);
+ << " at " << messageIdData.ledgerid() << ":" << messageIdData.entryid());
+ discardCorruptedMessage(cnx, messageIdData, proto::CommandAck::DecompressionError);
return false;
}
@@ -584,7 +683,7 @@
// This will only happen when the connection got reset and we cleared the queue
return;
}
- trackMessage(msg);
+ trackMessage(msg.getMessageId());
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
@@ -732,7 +831,7 @@
increaseAvailablePermits(currentCnx);
if (track) {
- trackMessage(msg);
+ trackMessage(msg.getMessageId());
}
}
@@ -1240,11 +1339,11 @@
negativeAcksTracker_.setEnabledForTesting(enabled);
}
-void ConsumerImpl::trackMessage(const Message& msg) {
+void ConsumerImpl::trackMessage(const MessageId& messageId) {
if (hasParent_) {
- unAckedMessageTrackerPtr_->remove(msg.getMessageId());
+ unAckedMessageTrackerPtr_->remove(messageId);
} else {
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ unAckedMessageTrackerPtr_->add(messageId);
}
}
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 0754a89..2bdb82f 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -40,6 +40,7 @@
#include "BatchAcknowledgementTracker.h"
#include <limits>
#include <lib/BrokerConsumerStatsImpl.h>
+#include <lib/MapCache.h>
#include <lib/stats/ConsumerStatsImpl.h>
#include <lib/stats/ConsumerStatsDisabled.h>
#include <queue>
@@ -157,8 +158,9 @@
private:
bool waitingForZeroQueueSizeMessage;
- bool uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
- const proto::MessageMetadata& metadata, SharedBuffer& payload);
+ bool uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageIdData,
+ const proto::MessageMetadata& metadata, SharedBuffer& payload,
+ bool checkMaxMessageSize);
void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
proto::CommandAck::ValidationError validationError);
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
@@ -177,7 +179,7 @@
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
void failPendingReceiveCallback();
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
- void trackMessage(const Message& msg);
+ void trackMessage(const MessageId& messageId);
Optional<MessageId> clearReceiveQueue();
@@ -227,6 +229,83 @@
return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId::earliest();
}
+ class ChunkedMessageCtx {
+ public:
+ ChunkedMessageCtx() : totalChunks_(0) {}
+ ChunkedMessageCtx(int totalChunks, int totalChunkMessageSize)
+ : totalChunks_(totalChunks), chunkedMsgBuffer_(SharedBuffer::allocate(totalChunkMessageSize)) {
+ chunkedMessageIds_.reserve(totalChunks);
+ }
+
+ ChunkedMessageCtx(const ChunkedMessageCtx&) = delete;
+ // Here we don't use =default to be compatible with GCC 4.8
+ ChunkedMessageCtx(ChunkedMessageCtx&& rhs) noexcept
+ : totalChunks_(rhs.totalChunks_),
+ chunkedMsgBuffer_(std::move(rhs.chunkedMsgBuffer_)),
+ chunkedMessageIds_(std::move(rhs.chunkedMessageIds_)) {}
+
+ bool validateChunkId(int chunkId) const noexcept { return chunkId == numChunks(); }
+
+ void appendChunk(const MessageId& messageId, const SharedBuffer& payload) {
+ chunkedMessageIds_.emplace_back(messageId);
+ chunkedMsgBuffer_.write(payload.data(), payload.readableBytes());
+ }
+
+ bool isCompleted() const noexcept { return totalChunks_ == numChunks(); }
+
+ const SharedBuffer& getBuffer() const noexcept { return chunkedMsgBuffer_; }
+
+ const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
+
+ friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) {
+ return os << "ChunkedMessageCtx " << ctx.chunkedMsgBuffer_.readableBytes() << " of "
+ << ctx.chunkedMsgBuffer_.writerIndex() << " bytes, " << ctx.numChunks() << " of "
+ << ctx.totalChunks_ << " chunks";
+ }
+
+ private:
+ const int totalChunks_;
+ SharedBuffer chunkedMsgBuffer_;
+ std::vector<MessageId> chunkedMessageIds_;
+
+ int numChunks() const noexcept { return static_cast<int>(chunkedMessageIds_.size()); }
+ };
+
+ const size_t maxPendingChunkedMessage_;
+ // if queue size is reasonable (most of the time equal to number of producers try to publish messages
+ // concurrently on the topic) then it guards against broken chunked message which was not fully published
+ const bool autoAckOldestChunkedMessageOnQueueFull_;
+
+ // The key is UUID, value is the associated ChunkedMessageCtx of the chunked message.
+ std::unordered_map<std::string, ChunkedMessageCtx> chunkedMessagesMap_;
+ // This list contains all the keys of `chunkedMessagesMap_`, each key is an UUID that identifies a pending
+ // chunked message. Once the number of pending chunked messages exceeds the limit, the oldest UUIDs and
+ // the associated ChunkedMessageCtx will be removed.
+ std::list<std::string> pendingChunkedMessageUuidQueue_;
+
+ // The key is UUID, value is the associated ChunkedMessageCtx of the chunked message.
+ MapCache<std::string, ChunkedMessageCtx> chunkedMessageCache_;
+ mutable std::mutex chunkProcessMutex_;
+
+ /**
+ * Process a chunk. If the chunk is the last chunk of a message, concatenate all buffered chunks into the
+ * payload and return it.
+ *
+ * @param payload the payload of a chunk
+ * @param metadata the message metadata
+ * @param messageId
+ * @param messageIdData
+ * @param cnx
+ *
+ * @return the concatenated payload if chunks are concatenated into a completed message payload
+ * successfully, else Optional::empty()
+ */
+ Optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
+ const proto::MessageMetadata& metadata,
+ const MessageId& messageId,
+ const proto::MessageIdData& messageIdData,
+ const ClientConnectionPtr& cnx);
+
friend class PulsarFriend;
// these two declared friend to access setNegativeAcknowledgeEnabledForTesting
diff --git a/pulsar-client-cpp/lib/MapCache.h b/pulsar-client-cpp/lib/MapCache.h
new file mode 100644
index 0000000..b9a0069
--- /dev/null
+++ b/pulsar-client-cpp/lib/MapCache.h
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <deque>
+#include <functional>
+#include <unordered_map>
+#include <vector>
+
+namespace pulsar {
+
+// A map cache that supports removing the first N oldest entries from the map.
+// Value must be moveable and have the default constructor.
+template <typename Key, typename Value>
+class MapCache {
+ std::unordered_map<Key, Value> map_;
+ std::deque<Key> keys_;
+
+ public:
+ using const_iterator = typename decltype(map_)::const_iterator;
+ using iterator = typename decltype(map_)::iterator;
+
+ MapCache() = default;
+ // Here we don't use =default to be compatible with GCC 4.8
+ MapCache(MapCache&& rhs) noexcept : map_(std::move(rhs.map_)), keys_(std::move(rhs.keys_)) {}
+
+ size_t size() const noexcept { return map_.size(); }
+
+ const_iterator find(const Key& key) const { return map_.find(key); }
+ iterator find(const Key& key) { return map_.find(key); }
+
+ const_iterator end() const noexcept { return map_.end(); }
+ iterator end() noexcept { return map_.end(); }
+
+ iterator putIfAbsent(const Key& key, Value&& value) {
+ auto it = map_.find(key);
+ if (it == map_.end()) {
+ keys_.push_back(key);
+ return map_.emplace(key, std::move(value)).first;
+ } else {
+ return end();
+ }
+ }
+
+ void removeOldestValues(size_t numToRemove,
+ const std::function<void(const Key&, const Value&)>& callback) {
+ for (size_t i = 0; !keys_.empty() && i < numToRemove; i++) {
+ const auto key = keys_.front();
+ auto it = map_.find(key);
+ if (it != map_.end()) {
+ if (callback) {
+ callback(it->first, it->second);
+ }
+ map_.erase(it);
+ }
+ keys_.pop_front();
+ }
+ }
+
+ void remove(const Key& key) {
+ auto it = map_.find(key);
+ if (it != map_.end()) {
+ removeKeyFromKeys(key);
+ map_.erase(it);
+ }
+ }
+
+ // Following methods are only used for tests
+ std::vector<Key> getKeys() const {
+ std::vector<Key> keys;
+ for (auto key : keys_) {
+ keys.emplace_back(key);
+ }
+ return keys;
+ }
+
+ private:
+ void removeKeyFromKeys(const Key& key) {
+ for (auto it = keys_.begin(); it != keys_.end(); ++it) {
+ if (*it == key) {
+ keys_.erase(it);
+ break;
+ }
+ }
+ }
+};
+
+} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MemoryLimitController.cc b/pulsar-client-cpp/lib/MemoryLimitController.cc
index 81578c9..4a23f8b 100644
--- a/pulsar-client-cpp/lib/MemoryLimitController.cc
+++ b/pulsar-client-cpp/lib/MemoryLimitController.cc
@@ -25,6 +25,10 @@
: memoryLimit_(memoryLimit), currentUsage_(0), mutex_(), condition_() {}
bool MemoryLimitController::tryReserveMemory(uint64_t size) {
+ // Avoid CAS operation when size is 0
+ if (size == 0) {
+ return true;
+ }
while (true) {
uint64_t current = currentUsage_;
uint64_t newUsage = current + size;
@@ -66,4 +70,4 @@
uint64_t MemoryLimitController::currentUsage() const { return currentUsage_; }
-} // namespace pulsar
\ No newline at end of file
+} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/OpSendMsg.h b/pulsar-client-cpp/lib/OpSendMsg.h
index 70b880c..365301b 100644
--- a/pulsar-client-cpp/lib/OpSendMsg.h
+++ b/pulsar-client-cpp/lib/OpSendMsg.h
@@ -29,7 +29,8 @@
namespace pulsar {
struct OpSendMsg {
- Message msg_;
+ proto::MessageMetadata metadata_;
+ SharedBuffer payload_;
SendCallback sendCallback_;
uint64_t producerId_;
uint64_t sequenceId_;
@@ -39,15 +40,24 @@
OpSendMsg() = default;
- OpSendMsg(const Message& msg, const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId,
- int sendTimeoutMs, uint32_t messagesCount, uint64_t messagesSize)
- : msg_(msg),
+ OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& payload,
+ const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId, int sendTimeoutMs,
+ uint32_t messagesCount, uint64_t messagesSize)
+ : metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with the
+ // a shared metadata object
+ payload_(payload),
sendCallback_(sendCallback),
producerId_(producerId),
sequenceId_(sequenceId),
timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)),
messagesCount_(messagesCount),
messagesSize_(messagesSize) {}
+
+ void complete(Result result, const MessageId& messageId) const {
+ if (sendCallback_) {
+ sendCallback_(result, messageId);
+ }
+ }
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index bdd23ed..4f8be3b 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -86,8 +86,7 @@
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
using namespace std::placeholders;
- std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
- auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
+ auto producer = std::make_shared<ProducerImpl>(client_, *topicName_, conf_, partition);
if (lazy) {
createLazyPartitionProducer(partition);
@@ -97,7 +96,7 @@
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
}
- LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
+ LOG_DEBUG("Creating Producer for single Partition - " << topicName_ << "-partition-" << partition);
return producer;
}
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 3e027ee..0ee38ca 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -251,4 +251,11 @@
return *this;
}
+ProducerConfiguration& ProducerConfiguration::setChunkingEnabled(bool chunkingEnabled) {
+ impl_->chunkingEnabled = chunkingEnabled;
+ return *this;
+}
+
+bool ProducerConfiguration::isChunkingEnabled() const { return impl_->chunkingEnabled; }
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index a41b250..2ac1eba 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -48,6 +48,7 @@
std::set<std::string> encryptionKeys;
ProducerCryptoFailureAction cryptoFailureAction{ProducerCryptoFailureAction::FAIL};
std::map<std::string, std::string> properties;
+ bool chunkingEnabled{false};
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index f81e205..87f8552 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -37,15 +37,15 @@
void complete(Result result) {
for (const auto& opSendMsg : opSendMsgs) {
- opSendMsg.sendCallback_(result, opSendMsg.msg_.getMessageId());
+ opSendMsg.complete(result, {});
}
}
};
-ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf,
- int32_t partition)
+ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
+ const ProducerConfiguration& conf, int32_t partition)
: HandlerBase(
- client, topic,
+ client, (partition < 0) ? topicName.toString() : topicName.getTopicPartitionName(partition),
Backoff(milliseconds(100), seconds(60), milliseconds(std::max(100, conf.getSendTimeout() - 100)))),
conf_(conf),
semaphore_(),
@@ -57,7 +57,8 @@
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
dataKeyGenIntervalSec_(4 * 60 * 60),
- memoryLimitController_(client->getMemoryLimitController()) {
+ memoryLimitController_(client->getMemoryLimitController()),
+ chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()) {
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
<< " id: " << producerId_);
@@ -323,14 +324,6 @@
}
}
-void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, SendCallback callback,
- boost::posix_time::ptime publishTime) {
- producerStatsBasePtr_->messageReceived(res, publishTime);
- if (callback) {
- callback(res, msgId);
- }
-}
-
void ProducerImpl::flushAsync(FlushCallback callback) {
if (batchMessageContainer_) {
Lock lock(mutex_);
@@ -358,44 +351,61 @@
}
}
+bool ProducerImpl::isValidProducerState(const SendCallback& callback) const {
+ Lock lock(mutex_);
+ const auto state = state_;
+ lock.unlock();
+ switch (state) {
+ case HandlerBase::Ready:
+ // OK
+ case HandlerBase::Pending:
+ // We are OK to queue the messages on the client, it will be sent to the broker once we get the
+ // connection
+ return true;
+ case HandlerBase::Closing:
+ case HandlerBase::Closed:
+ callback(ResultAlreadyClosed, {});
+ return false;
+ case HandlerBase::NotStarted:
+ case HandlerBase::Failed:
+ default:
+ callback(ResultNotConnected, {});
+ return false;
+ }
+}
+
+bool ProducerImpl::canAddToBatch(const Message& msg) const {
+ // If a message has a delayed delivery time, we'll always send it individually
+ return batchMessageContainer_.get() && !msg.impl_->metadata.has_deliver_at_time();
+}
+
+static SharedBuffer applyCompression(const SharedBuffer& uncompressedPayload,
+ CompressionType compressionType) {
+ return CompressionCodecProvider::getCodec(compressionType).encode(uncompressedPayload);
+}
+
void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
producerStatsBasePtr_->messageSent(msg);
- SendCallback cb =
- std::bind(&ProducerImpl::statsCallBackHandler, shared_from_this(), std::placeholders::_1,
- std::placeholders::_2, callback, boost::posix_time::microsec_clock::universal_time());
- // Compress the payload if required
- SharedBuffer& payload = msg.impl_->payload;
-
- uint32_t uncompressedSize = payload.readableBytes();
- uint32_t payloadSize = uncompressedSize;
- ClientConnectionPtr cnx = getCnx().lock();
- if (!batchMessageContainer_) {
- // If batching is enabled we compress all the payloads together before sending the batch
- payload = CompressionCodecProvider::getCodec(conf_.getCompressionType()).encode(payload);
- payloadSize = payload.readableBytes();
-
- // Encrypt the payload if enabled
- SharedBuffer encryptedPayload;
- if (!encryptMessage(msg.impl_->metadata, payload, encryptedPayload)) {
- cb(ResultCryptoError, msg.getMessageId());
- return;
+ const auto now = boost::posix_time::microsec_clock::universal_time();
+ auto self = shared_from_this();
+ sendAsyncWithStatsUpdate(msg, [this, self, now, callback](Result result, const MessageId& messageId) {
+ producerStatsBasePtr_->messageReceived(result, now);
+ if (callback) {
+ callback(result, messageId);
}
- payload = encryptedPayload;
+ });
+}
- if (payloadSize > ClientConnection::getMaxMessageSize()) {
- LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed "
- << ClientConnection::getMaxMessageSize() << " bytes");
- cb(ResultMessageTooBig, msg.getMessageId());
- return;
- }
+void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& callback) {
+ if (!isValidProducerState(callback)) {
+ return;
}
- // Reserve a spot in the messages queue before acquiring the ProducerImpl
- // mutex. When the queue is full, this call will block until a spot is
- // available.
- Result res = canEnqueueRequest(payloadSize);
- if (res != ResultOk) {
+ const auto& uncompressedPayload = msg.impl_->payload;
+ const uint32_t uncompressedSize = uncompressedPayload.readableBytes();
+ const auto result = canEnqueueRequest(uncompressedSize);
+ if (result != ResultOk) {
// If queue is full sending the batch immediately, no point waiting till batchMessagetimeout
if (batchMessageContainer_) {
LOG_DEBUG(getName() << " - sending batch message immediately");
@@ -405,43 +415,64 @@
failures.complete();
}
- cb(res, msg.getMessageId());
+ callback(result, {});
return;
}
+ // We have already reserved a spot, so if we need to early return for failed result, we should release the
+ // semaphore and memory first.
+ const auto handleFailedResult = [this, uncompressedSize, callback](Result result) {
+ releaseSemaphore(uncompressedSize); // it releases the memory as well
+ callback(result, {});
+ };
+
+ const bool compressed = !canAddToBatch(msg);
+ const auto payload =
+ compressed ? applyCompression(uncompressedPayload, conf_.getCompressionType()) : uncompressedPayload;
+ const auto compressedSize = static_cast<uint32_t>(payload.readableBytes());
+ const auto maxMessageSize = static_cast<uint32_t>(ClientConnection::getMaxMessageSize());
+
+ if (compressed && compressedSize > ClientConnection::getMaxMessageSize() && !chunkingEnabled_) {
+ LOG_WARN(getName() << " - compressed Message payload size " << payload.readableBytes()
+ << " cannot exceed " << ClientConnection::getMaxMessageSize()
+ << " bytes unless chunking is enabled");
+ handleFailedResult(ResultMessageTooBig);
+ return;
+ }
+
+ auto& msgMetadata = msg.impl_->metadata;
+ if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) {
+ handleFailedResult(ResultInvalidMessage);
+ return;
+ }
+
+ const int totalChunks =
+ canAddToBatch(msg) ? 1 : getNumOfChunks(compressedSize, ClientConnection::getMaxMessageSize());
+ // Each chunk should be sent individually, so try to acquire extra permits for chunks.
+ for (int i = 0; i < (totalChunks - 1); i++) {
+ const auto result = canEnqueueRequest(0); // size is 0 because the memory has already reserved
+ if (result != ResultOk) {
+ handleFailedResult(result);
+ return;
+ }
+ }
+
Lock lock(mutex_);
- // producers may be lazily starting and be in the pending state
- if (state_ != Ready && state_ != Pending) {
- lock.unlock();
- releaseSemaphore(payloadSize);
- cb(ResultAlreadyClosed, msg.getMessageId());
- return;
- }
-
- if (msg.impl_->metadata.has_producer_name()) {
- // Message had already been sent before
- lock.unlock();
- releaseSemaphore(payloadSize);
- cb(ResultInvalidMessage, msg.getMessageId());
- return;
- }
-
uint64_t sequenceId;
- if (!msg.impl_->metadata.has_sequence_id()) {
+ if (!msgMetadata.has_sequence_id()) {
sequenceId = msgSequenceGenerator_++;
} else {
- sequenceId = msg.impl_->metadata.sequence_id();
+ sequenceId = msgMetadata.sequence_id();
}
setMessageMetadata(msg, sequenceId, uncompressedSize);
- // If we reach this point then you have a reserved spot on the queue
- if (batchMessageContainer_ && !msg.impl_->metadata.has_deliver_at_time()) {
+ if (canAddToBatch(msg)) {
// Batching is enabled and the message is not delayed
if (!batchMessageContainer_->hasEnoughSpace(msg)) {
batchMessageAndSend().complete();
}
bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
- bool isFull = batchMessageContainer_->add(msg, cb);
+ bool isFull = batchMessageContainer_->add(msg, callback);
if (isFirstMessage) {
batchTimer_->expires_from_now(
boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
@@ -455,10 +486,42 @@
failures.complete();
}
} else {
- sendMessage(OpSendMsg{msg, cb, producerId_, sequenceId, conf_.getSendTimeout(), 1, payloadSize});
+ const bool sendChunks = (totalChunks > 1);
+ if (sendChunks) {
+ msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId));
+ msgMetadata.set_num_chunks_from_msg(totalChunks);
+ msgMetadata.set_total_chunk_msg_size(compressedSize);
+ }
+
+ int beginIndex = 0;
+ for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+ if (sendChunks) {
+ msgMetadata.set_chunk_id(chunkId);
+ }
+ const uint32_t endIndex = std::min(compressedSize, beginIndex + maxMessageSize);
+ auto chunkedPayload = payload.slice(beginIndex, endIndex - beginIndex);
+ beginIndex = endIndex;
+
+ SharedBuffer encryptedPayload;
+ if (!encryptMessage(msgMetadata, chunkedPayload, encryptedPayload)) {
+ handleFailedResult(ResultCryptoError);
+ return;
+ }
+
+ sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
+ (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId,
+ conf_.getSendTimeout(), 1, uncompressedSize});
+ }
}
}
+int ProducerImpl::getNumOfChunks(uint32_t size, uint32_t maxMessageSize) {
+ if (size >= maxMessageSize && maxMessageSize != 0) {
+ return size / maxMessageSize + ((size % maxMessageSize == 0) ? 0 : 1);
+ }
+ return 1;
+}
+
Result ProducerImpl::canEnqueueRequest(uint32_t payloadSize) {
if (conf_.getBlockIfQueueFull()) {
if (semaphore_) {
@@ -523,7 +586,7 @@
// we need to release the spot manually
LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: " << result);
releaseSemaphoreForSendOp(opSendMsg);
- failures.add(std::bind(opSendMsg.sendCallback_, result, MessageId{}));
+ failures.add([opSendMsg, result] { opSendMsg.complete(result, {}); });
}
} else if (numBatches > 1) {
std::vector<OpSendMsg> opSendMsgs;
@@ -537,7 +600,9 @@
LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsgs[" << i
<< "]: " << results[i]);
releaseSemaphoreForSendOp(opSendMsgs[i]);
- failures.add(std::bind(opSendMsgs[i].sendCallback_, results[i], MessageId{}));
+ const auto& opSendMsg = opSendMsgs[i];
+ const auto result = results[i];
+ failures.add([opSendMsg, result] { opSendMsg.complete(result, {}); });
}
}
} // else numBatches is 0, do nothing
@@ -551,7 +616,7 @@
// a. we have a reserved spot on the queue
// b. call this function after acquiring the ProducerImpl mutex_
void ProducerImpl::sendMessage(const OpSendMsg& op) {
- const auto sequenceId = op.msg_.impl_->metadata.sequence_id();
+ const auto sequenceId = op.metadata_.sequence_id();
LOG_DEBUG("Inserting data to pendingMessagesQueue_");
pendingMessagesQueue_.push_back(op);
@@ -748,13 +813,11 @@
LOG_DEBUG(getName() << "Remove corrupt message from queue " << sequenceId);
pendingMessagesQueue_.pop_front();
lock.unlock();
- if (op.sendCallback_) {
+ try {
// to protect from client callback exception
- try {
- op.sendCallback_(ResultChecksumError, op.msg_.getMessageId());
- } catch (const std::exception& e) {
- LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
- }
+ op.complete(ResultChecksumError, {});
+ } catch (const std::exception& e) {
+ LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
releaseSemaphoreForSendOp(op);
return true;
@@ -795,12 +858,10 @@
pendingMessagesQueue_.pop_front();
lock.unlock();
- if (op.sendCallback_) {
- try {
- op.sendCallback_(ResultOk, messageId);
- } catch (const std::exception& e) {
- LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
- }
+ try {
+ op.complete(ResultOk, messageId);
+ } catch (const std::exception& e) {
+ LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
return true;
}
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index d29efed..3dccf91 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -47,12 +47,13 @@
class Producer;
class MemoryLimitController;
+class TopicName;
class ProducerImpl : public HandlerBase,
public std::enable_shared_from_this<ProducerImpl>,
public ProducerImplBase {
public:
- ProducerImpl(ClientImplPtr client, const std::string& topic,
+ ProducerImpl(ClientImplPtr client, const TopicName& topic,
const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
~ProducerImpl();
@@ -83,6 +84,8 @@
int32_t partition() const noexcept { return partition_; }
+ static int getNumOfChunks(uint32_t size, uint32_t maxMessageSize);
+
protected:
ProducerStatsBasePtr producerStatsBasePtr_;
@@ -115,8 +118,6 @@
void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData);
- void statsCallBackHandler(Result, const MessageId&, SendCallback, boost::posix_time::ptime);
-
void handleClose(Result result, ResultCallback callback, ProducerImplPtr producer);
void resendMessages(ClientConnectionPtr cnx);
@@ -125,12 +126,26 @@
bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
SharedBuffer& encryptedPayload);
+ void sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& callback);
+
+ /**
+ * Reserve a spot in the messages queue before acquiring the ProducerImpl mutex. When the queue is full,
+ * this call will block until a spot is available if blockIfQueueIsFull is true. Otherwise, it will return
+ * ResultProducerQueueIsFull immediately.
+ *
+ * It also checks whether the memory could reach the limit after `payloadSize` is added. If so, this call
+ * will block until enough memory could be retained.
+ */
Result canEnqueueRequest(uint32_t payloadSize);
+
void releaseSemaphore(uint32_t payloadSize);
void releaseSemaphoreForSendOp(const OpSendMsg& op);
void cancelTimers();
+ bool isValidProducerState(const SendCallback& callback) const;
+ bool canAddToBatch(const Message& msg) const;
+
typedef std::unique_lock<std::mutex> Lock;
ProducerConfiguration conf_;
@@ -138,7 +153,7 @@
std::unique_ptr<Semaphore> semaphore_;
MessageQueue pendingMessagesQueue_;
- int32_t partition_; // -1 if topic is non-partitioned
+ const int32_t partition_; // -1 if topic is non-partitioned
std::string producerName_;
bool userProvidedProducerName_;
std::string producerStr_;
@@ -169,6 +184,7 @@
uint32_t dataKeyGenIntervalSec_;
MemoryLimitController& memoryLimitController_;
+ const bool chunkingEnabled_;
};
struct ProducerImplCmp {
diff --git a/pulsar-client-cpp/lib/SharedBuffer.h b/pulsar-client-cpp/lib/SharedBuffer.h
index d544299..be889a7 100644
--- a/pulsar-client-cpp/lib/SharedBuffer.h
+++ b/pulsar-client-cpp/lib/SharedBuffer.h
@@ -94,13 +94,13 @@
/**
* Return a shared buffer that include a portion of current buffer. No memory is copied
*/
- SharedBuffer slice(uint32_t offset) {
+ SharedBuffer slice(uint32_t offset) const {
SharedBuffer buf(*this);
buf.consume(offset);
return buf;
}
- SharedBuffer slice(uint32_t offset, uint32_t length) {
+ SharedBuffer slice(uint32_t offset, uint32_t length) const {
SharedBuffer buf(*this);
buf.consume(offset);
assert(buf.readableBytes() >= length);
@@ -165,7 +165,7 @@
}
// Return current writer index
- uint32_t writerIndex() { return writeIdx_; }
+ uint32_t writerIndex() const noexcept { return writeIdx_; }
// skip writerIndex
void skipBytes(uint32_t size) {
@@ -180,7 +180,7 @@
}
// Return current reader index
- uint32_t readerIndex() { return readIdx_; }
+ uint32_t readerIndex() const noexcept { return readIdx_; }
// set readerIndex
void setReaderIndex(uint32_t index) {
diff --git a/pulsar-client-cpp/lib/TopicName.cc b/pulsar-client-cpp/lib/TopicName.cc
index a56b979..2e6232c 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -216,7 +216,7 @@
return ss.str();
}
-std::string TopicName::toString() {
+std::string TopicName::toString() const {
std::stringstream ss;
std::string seperator("/");
if (isV2Topic_ && cluster_.empty()) {
@@ -230,7 +230,7 @@
bool TopicName::isPersistent() const { return this->domain_ == TopicDomain::Persistent; }
-const std::string TopicName::getTopicPartitionName(unsigned int partition) {
+std::string TopicName::getTopicPartitionName(unsigned int partition) const {
std::stringstream topicPartitionName;
// make this topic name as well
topicPartitionName << toString() << PartitionedProducerImpl::PARTITION_NAME_SUFFIX << partition;
diff --git a/pulsar-client-cpp/lib/TopicName.h b/pulsar-client-cpp/lib/TopicName.h
index 248001c..1d5deab 100644
--- a/pulsar-client-cpp/lib/TopicName.h
+++ b/pulsar-client-cpp/lib/TopicName.h
@@ -55,14 +55,14 @@
std::string getNamespacePortion();
std::string getLocalName();
std::string getEncodedLocalName();
- std::string toString();
+ std::string toString() const;
bool isPersistent() const;
NamespaceNamePtr getNamespaceName();
int getPartitionIndex() const noexcept { return partition_; }
static std::shared_ptr<TopicName> get(const std::string& topicName);
bool operator==(const TopicName& other);
static std::string getEncodedName(const std::string& nameBeforeEncoding);
- const std::string getTopicPartitionName(unsigned int partition);
+ std::string getTopicPartitionName(unsigned int partition) const;
static int getPartitionIndex(const std::string& topic);
private:
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsBase.h b/pulsar-client-cpp/lib/stats/ProducerStatsBase.h
index 494303b..0ae16d1 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsBase.h
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsBase.h
@@ -27,7 +27,7 @@
class ProducerStatsBase {
public:
virtual void messageSent(const Message& msg) = 0;
- virtual void messageReceived(Result&, boost::posix_time::ptime&) = 0;
+ virtual void messageReceived(Result, const boost::posix_time::ptime&) = 0;
virtual ~ProducerStatsBase(){};
};
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsDisabled.h b/pulsar-client-cpp/lib/stats/ProducerStatsDisabled.h
index f81c8aa..6568c07 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsDisabled.h
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsDisabled.h
@@ -25,7 +25,7 @@
class ProducerStatsDisabled : public ProducerStatsBase {
public:
virtual void messageSent(const Message& msg){};
- virtual void messageReceived(Result&, boost::posix_time::ptime&){};
+ virtual void messageReceived(Result, const boost::posix_time::ptime&){};
};
} // namespace pulsar
#endif // PULSAR_PRODUCER_STATS_DISABLED_HEADER
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
index af7ae4b..811c0e1 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
@@ -93,7 +93,7 @@
totalBytesSent_ += msg.getLength();
}
-void ProducerStatsImpl::messageReceived(Result& res, boost::posix_time::ptime& publishTime) {
+void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::ptime& publishTime) {
boost::posix_time::ptime currentTime = boost::posix_time::microsec_clock::universal_time();
double diffInMicros = (currentTime - publishTime).total_microseconds();
Lock lock(mutex_);
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
index e82628b..27ffacc 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
@@ -82,7 +82,7 @@
void messageSent(const Message&);
- void messageReceived(Result&, boost::posix_time::ptime&);
+ void messageReceived(Result, const boost::posix_time::ptime&);
~ProducerStatsImpl();
diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf b/pulsar-client-cpp/test-conf/standalone-ssl.conf
index 90c4822..5c94e58 100644
--- a/pulsar-client-cpp/test-conf/standalone-ssl.conf
+++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf
@@ -300,4 +300,7 @@
globalZookeeperServers={{ zookeeper_servers }}
# Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds
-brokerServicePurgeInactiveFrequencyInSeconds=60
\ No newline at end of file
+brokerServicePurgeInactiveFrequencyInSeconds=60
+
+# Given a specific limit of the max message size
+maxMessageSize=1024000
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index be65775..da5c609 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -602,7 +602,7 @@
Result result = client.createProducer(topicName, conf, producer);
ASSERT_EQ(ResultOk, result);
- int size = Commands::DefaultMaxMessageSize + 1000 * 100;
+ int size = ClientConnection::getMaxMessageSize() + 1000 * 100;
char *content = new char[size];
memset(content, 0, size);
Message msg = MessageBuilder().setAllocatedContent(content, size).build();
@@ -610,7 +610,7 @@
ASSERT_EQ(ResultMessageTooBig, result);
// Anything up to MaxMessageSize should be allowed
- size = Commands::DefaultMaxMessageSize;
+ size = ClientConnection::getMaxMessageSize();
msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
@@ -1156,7 +1156,7 @@
result = producerFuture.get(producer2);
ASSERT_EQ(ResultOk, result);
- int size = Commands::DefaultMaxMessageSize + 1000 * 100;
+ int size = ClientConnection::getMaxMessageSize() + 1000 * 100;
char *content = new char[size];
memset(content, 0, size);
Message msg = MessageBuilder().setAllocatedContent(content, size).build();
@@ -1208,7 +1208,7 @@
result = client.createProducer(topicName, conf2, producer2);
ASSERT_EQ(ResultOk, result);
- int size = Commands::DefaultMaxMessageSize + 1000 * 100;
+ int size = ClientConnection::getMaxMessageSize() + 1000 * 100;
char *content = new char[size];
memset(content, 0, size);
Message msg = MessageBuilder().setAllocatedContent(content, size).build();
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 199b50c..57ed0ec 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -59,6 +59,8 @@
ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), false);
ASSERT_EQ(conf.getProperties().empty(), true);
ASSERT_EQ(conf.getPriorityLevel(), 0);
+ ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 100);
+ ASSERT_EQ(conf.isAutoOldestChunkedMessageOnQueueFull(), false);
}
TEST(ConsumerConfigurationTest, testCustomConfig) {
@@ -139,6 +141,12 @@
conf.setPriorityLevel(1);
ASSERT_EQ(conf.getPriorityLevel(), 1);
+
+ conf.setMaxPendingChunkedMessage(500);
+ ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 500);
+
+ conf.setAutoOldestChunkedMessageOnQueueFull(true);
+ ASSERT_TRUE(conf.isAutoOldestChunkedMessageOnQueueFull());
}
TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {
diff --git a/pulsar-client-cpp/tests/MapCacheTest.cc b/pulsar-client-cpp/tests/MapCacheTest.cc
new file mode 100644
index 0000000..12a89ee
--- /dev/null
+++ b/pulsar-client-cpp/tests/MapCacheTest.cc
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <lib/MapCache.h>
+
+using namespace pulsar;
+
+struct MoveOnlyInt {
+ int x = 0;
+
+ MoveOnlyInt() = default;
+ MoveOnlyInt(int xx) : x(xx) {}
+ MoveOnlyInt(const MoveOnlyInt&) = delete;
+ MoveOnlyInt(MoveOnlyInt&& rhs) noexcept : x(rhs.x) {}
+
+ bool operator=(const MoveOnlyInt& rhs) const { return x == rhs.x; }
+};
+
+TEST(MapCacheTest, testPutIfAbsent) {
+ MapCache<int, MoveOnlyInt> cache;
+
+ ASSERT_NE(cache.putIfAbsent(1, {100}), cache.end());
+ ASSERT_EQ(cache.putIfAbsent(1, {200}), cache.end());
+ auto it = cache.find(1);
+ ASSERT_NE(it, cache.end());
+ ASSERT_EQ(it->second.x, 100);
+
+ cache.remove(1);
+ ASSERT_EQ(cache.find(1), cache.end());
+}
+
+TEST(MapCacheTest, testRemoveOldestValues) {
+ MapCache<int, MoveOnlyInt> cache;
+ cache.putIfAbsent(1, {200});
+ cache.putIfAbsent(2, {210});
+ cache.putIfAbsent(3, {220});
+ ASSERT_EQ(cache.getKeys(), (std::vector<int>{1, 2, 3}));
+
+ std::vector<int> removedValues;
+ cache.removeOldestValues(2, [&removedValues](const int& key, const MoveOnlyInt& value) {
+ removedValues.emplace_back(value.x);
+ });
+ ASSERT_EQ(removedValues, (std::vector<int>{200, 210}));
+
+ ASSERT_EQ(cache.getKeys(), (std::vector<int>{3}));
+ ASSERT_EQ(cache.size(), 1);
+ auto it = cache.find(3);
+ ASSERT_NE(it, cache.end());
+ ASSERT_EQ(it->second.x, 220);
+}
+
+TEST(MapCacheTest, testRemoveAllValues) {
+ MapCache<int, MoveOnlyInt> cache;
+ cache.putIfAbsent(1, {300});
+ cache.putIfAbsent(2, {310});
+ cache.putIfAbsent(3, {320});
+
+ // removeOldestValues works well even if the argument is greater than the size of keys
+ cache.removeOldestValues(10000, nullptr);
+ ASSERT_TRUE(cache.getKeys().empty());
+ ASSERT_EQ(cache.size(), 0);
+}
diff --git a/pulsar-client-cpp/tests/MessageChunkingTest.cc b/pulsar-client-cpp/tests/MessageChunkingTest.cc
new file mode 100644
index 0000000..ae0114c
--- /dev/null
+++ b/pulsar-client-cpp/tests/MessageChunkingTest.cc
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <ctime>
+#include <random>
+
+#include <pulsar/Client.h>
+#include <gtest/gtest.h>
+#include "lib/LogUtils.h"
+#include "PulsarFriend.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+// See the `maxMessageSize` config in test-conf/standalone-ssl.conf
+static constexpr size_t maxMessageSize = 1024000;
+
+static std::string toString(CompressionType compressionType) {
+ switch (compressionType) {
+ case CompressionType::CompressionNone:
+ return "None";
+ case CompressionType::CompressionLZ4:
+ return "LZ4";
+ case CompressionType::CompressionZLib:
+ return "ZLib";
+ case CompressionType::CompressionZSTD:
+ return "ZSTD";
+ case CompressionType::CompressionSNAPPY:
+ return "SNAPPY";
+ default:
+ return "Unknown (" + std::to_string(compressionType) + ")";
+ }
+}
+
+inline std::string createLargeMessage() {
+ std::string largeMessage(maxMessageSize * 3, 'a');
+ std::default_random_engine e(time(nullptr));
+ std::uniform_int_distribution<unsigned> u(0, 25);
+ for (size_t i = 0; i < largeMessage.size(); i++) {
+ largeMessage[i] = 'a' + u(e);
+ }
+ return largeMessage;
+}
+
+class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> {
+ public:
+ static std::string largeMessage;
+
+ void TearDown() override { client_.close(); }
+
+ void createProducer(const std::string& topic, Producer& producer) {
+ ProducerConfiguration conf;
+ conf.setBatchingEnabled(false);
+ conf.setChunkingEnabled(true);
+ conf.setCompressionType(GetParam());
+ LOG_INFO("Create producer to topic: " << topic
+ << ", compression: " << toString(conf.getCompressionType()));
+ ASSERT_EQ(ResultOk, client_.createProducer(topic, conf, producer));
+ }
+
+ void createConsumer(const std::string& topic, Consumer& consumer) {
+ ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
+ }
+
+ private:
+ Client client_{lookupUrl};
+};
+
+std::string MessageChunkingTest::largeMessage = createLargeMessage();
+
+TEST_F(MessageChunkingTest, testInvalidConfig) {
+ Client client(lookupUrl);
+ ProducerConfiguration conf;
+ conf.setBatchingEnabled(true);
+ conf.setChunkingEnabled(true);
+ Producer producer;
+ ASSERT_THROW(client.createProducer("xxx", conf, producer), std::invalid_argument);
+ client.close();
+}
+
+TEST_P(MessageChunkingTest, testEndToEnd) {
+ const std::string topic =
+ "MessageChunkingTest-EndToEnd-" + toString(GetParam()) + std::to_string(time(nullptr));
+ Consumer consumer;
+ createConsumer(topic, consumer);
+ Producer producer;
+ createProducer(topic, producer);
+
+ constexpr int numMessages = 10;
+
+ std::vector<MessageId> sendMessageIds;
+ for (int i = 0; i < numMessages; i++) {
+ MessageId messageId;
+ ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
+ LOG_INFO("Send " << i << " to " << messageId);
+ sendMessageIds.emplace_back(messageId);
+ }
+
+ Message msg;
+ std::vector<MessageId> receivedMessageIds;
+ for (int i = 0; i < numMessages; i++) {
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+ LOG_INFO("Receive " << msg.getLength() << " bytes from " << msg.getMessageId());
+ ASSERT_EQ(msg.getDataAsString(), largeMessage);
+ receivedMessageIds.emplace_back(msg.getMessageId());
+ }
+ ASSERT_EQ(receivedMessageIds, sendMessageIds);
+ ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId());
+ ASSERT_GT(receivedMessageIds.back().entryId(), numMessages);
+
+ // Verify the cache has been cleared
+ auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
+ ASSERT_EQ(chunkedMessageCache.size(), 0);
+}
+
+// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P
+INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest,
+ ::testing::Values(CompressionNone, CompressionLZ4, CompressionZLib, CompressionZSTD,
+ CompressionSNAPPY));
diff --git a/pulsar-client-cpp/tests/ProducerConfigurationTest.cc b/pulsar-client-cpp/tests/ProducerConfigurationTest.cc
index b88f6e4..5c54129 100644
--- a/pulsar-client-cpp/tests/ProducerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ProducerConfigurationTest.cc
@@ -46,6 +46,7 @@
ASSERT_EQ(conf.isEncryptionEnabled(), false);
ASSERT_EQ(conf.getEncryptionKeys(), std::set<std::string>{});
ASSERT_EQ(conf.getProperties().empty(), true);
+ ASSERT_EQ(conf.isChunkingEnabled(), false);
}
class MockMessageRoutingPolicy : public MessageRoutingPolicy {
@@ -129,4 +130,7 @@
conf.setProperty("k1", "v1");
ASSERT_EQ(conf.getProperties()["k1"], "v1");
ASSERT_EQ(conf.hasProperty("k1"), true);
+
+ conf.setChunkingEnabled(true);
+ ASSERT_EQ(conf.isChunkingEnabled(), true);
}
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 210f013..3ee8def 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -26,6 +26,7 @@
#include "lib/Utils.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"
+#include "lib/ProducerImpl.h"
DECLARE_LOG_OBJECT()
using namespace pulsar;
@@ -240,4 +241,12 @@
client.close();
LOG_INFO("End of run " << run);
}
-}
\ No newline at end of file
+}
+
+TEST(ProducerTest, testGetNumOfChunks) {
+ ASSERT_EQ(ProducerImpl::getNumOfChunks(11, 5), 3);
+ ASSERT_EQ(ProducerImpl::getNumOfChunks(10, 5), 2);
+ ASSERT_EQ(ProducerImpl::getNumOfChunks(8, 5), 2);
+ ASSERT_EQ(ProducerImpl::getNumOfChunks(4, 5), 1);
+ ASSERT_EQ(ProducerImpl::getNumOfChunks(1, 0), 1);
+}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index aed7096..c7aa3a6 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -79,6 +79,12 @@
return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
}
+ static decltype(ConsumerImpl::chunkedMessageCache_) & getChunkedMessageCache(Consumer consumer) {
+ auto consumerImpl = getConsumerImplPtr(consumer);
+ ConsumerImpl::Lock lock(consumerImpl->chunkProcessMutex_);
+ return consumerImpl->chunkedMessageCache_;
+ }
+
static std::shared_ptr<PartitionedConsumerImpl> getPartitionedConsumerImplPtr(Consumer consumer) {
return std::static_pointer_cast<PartitionedConsumerImpl>(consumer.impl_);
}