[C++] PIP 37: Support large message size (#13627)

* Support configuring chunkingEnabled

* Add limitations when chunking is enabled

* [WIP] serializeAndSendMessage

* Pass TopicName instead of string to ProducerImpl's constructor

* Send messages by chunks

* Check whether the callback should be triggered in sendMessage

* Combine callback and releaseSemaphore

* Wrap the send callback with stats update

* Fix incorrect isValidProducerState

* Fix checksum error when chunks are sent

* Add chunked configs from consumer

* Support consuming chunks

* Fix incorrect concanated payload size

* Add tests for chunking messages

* Fixed tests failure when compression is enabled

* Improve logs

* Refactor chunking related fields and fix memory error

* Fix comments

* Add MapCache class

* Use MapCache to refactor ConsumerImpl

* Verify the chunk cache is cleared

* Fix chunked cache

* Fix CentOS 7 build

* Fix Ubuntu 16.04 build failure

* Fix incompatibility with GTest 1.8.0

* Fix tests

* Fix GCC 5.4 segmentation fault
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 201eba3..2cdbf47 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -437,6 +437,52 @@
      */
     int getPriorityLevel() const;
 
+    /**
+     * Consumer buffers chunk messages into memory until it receives all the chunks of the original message.
+     * While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they
+     * might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage
+     * chunks coming from different messages. This mainly happens when multiple publishers are publishing
+     * messages on the topic concurrently or publisher failed to publish all chunks of the messages.
+     *
+     * eg: M1-C1, M2-C1, M1-C2, M2-C2
+     * Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 belong to M2 message.
+     *
+     * Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it
+     * can be guarded by providing this maxPendingChunkedMessage threshold. Once, consumer reaches this
+     * threshold, it drops the outstanding unchunked-messages by silently acking or asking broker to redeliver
+     * later by marking it unacked. See setAutoOldestChunkedMessageOnQueueFull.
+     *
+     * If it's zero, the pending chunked messages will not be limited.
+     *
+     * Default: 100
+     *
+     * @param maxPendingChunkedMessage the number of max pending chunked messages
+     */
+    ConsumerConfiguration& setMaxPendingChunkedMessage(size_t maxPendingChunkedMessage);
+
+    /**
+     * The associated getter of setMaxPendingChunkedMessage
+     */
+    size_t getMaxPendingChunkedMessage() const;
+
+    /**
+     * Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it
+     * can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage.
+     * Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking
+     * if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
+     *
+     * Default: false
+     *
+     * @param autoAckOldestChunkedMessageOnQueueFull whether to ack the discarded chunked message
+     */
+    ConsumerConfiguration& setAutoOldestChunkedMessageOnQueueFull(
+        bool autoAckOldestChunkedMessageOnQueueFull);
+
+    /**
+     * The associated getter of setAutoOldestChunkedMessageOnQueueFull
+     */
+    bool isAutoOldestChunkedMessageOnQueueFull() const;
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 5c2792a..7c278dd 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -473,6 +473,34 @@
      */
     ProducerConfiguration& setProperties(const std::map<std::string, std::string>& properties);
 
+    /**
+     * If message size is higher than allowed max publish-payload size by broker then enableChunking helps
+     * producer to split message into multiple chunks and publish them to broker separately in order. So, it
+     * allows client to successfully publish large size of messages in pulsar.
+     *
+     * Set it true to enable this feature. If so, you must disable batching (see setBatchingEnabled),
+     * otherwise the producer creation will fail.
+     *
+     * There are some other recommendations when it's enabled:
+     * 1. This features is right now only supported for non-shared subscription and persistent-topic.
+     * 2. It's better to reduce setMaxPendingMessages to avoid producer accupying large amount of memory by
+     * buffered messages.
+     * 3. Set message-ttl on the namespace to cleanup chunked messages. Sometimes due to broker-restart or
+     * publish time, producer might fail to publish entire large message. So, consumer will not be able to
+     * consume and ack those messages.
+     *
+     * Default: false
+     *
+     * @param chunkingEnabled whether chunking is enabled
+     * @return the ProducerConfiguration self
+     */
+    ProducerConfiguration& setChunkingEnabled(bool chunkingEnabled);
+
+    /**
+     * The getter associated with setChunkingEnabled().
+     */
+    bool isChunkingEnabled() const;
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/lib/BatchMessageContainerBase.cc b/pulsar-client-cpp/lib/BatchMessageContainerBase.cc
index 1201707..e9e6b98 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainerBase.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainerBase.cc
@@ -74,7 +74,8 @@
         return ResultMessageTooBig;
     }
 
-    opSendMsg.msg_.impl_ = impl;
+    opSendMsg.metadata_ = impl->metadata;
+    opSendMsg.payload_ = impl->payload;
     opSendMsg.sequenceId_ = impl->metadata.sequence_id();
     opSendMsg.producerId_ = producerId_;
     opSendMsg.timeout_ = TimeUtils::now() + milliseconds(producerConfig_.getSendTimeout());
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 3ad6f40..79bc1d7 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -1366,8 +1366,9 @@
 }
 
 void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) {
-    PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_, opSend.producerId_,
-                                                opSend.sequenceId_, getChecksumType(), opSend.msg_);
+    PairSharedBuffer buffer =
+        Commands::newSend(outgoingBuffer_, outgoingCmd_, opSend.producerId_, opSend.sequenceId_,
+                          getChecksumType(), opSend.metadata_, opSend.payload_);
 
     asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
                                                          shared_from_this(), std::placeholders::_1)));
@@ -1408,8 +1409,9 @@
             assert(any.type() == typeid(OpSendMsg));
 
             const OpSendMsg& op = boost::any_cast<const OpSendMsg&>(any);
-            PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_, op.producerId_,
-                                                        op.sequenceId_, getChecksumType(), op.msg_);
+            PairSharedBuffer buffer =
+                Commands::newSend(outgoingBuffer_, outgoingCmd_, op.producerId_, op.sequenceId_,
+                                  getChecksumType(), op.metadata_, op.payload_);
 
             asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
                                                                  shared_from_this(), std::placeholders::_1)));
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 8cdaacc..0b07b6e 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -29,6 +29,7 @@
 #include <pulsar/ConsoleLoggerFactory.h>
 #include <boost/algorithm/string/predicate.hpp>
 #include <sstream>
+#include <stdexcept>
 #include <lib/HTTPLookupService.h>
 #include <lib/TopicName.h>
 #include <algorithm>
@@ -147,6 +148,9 @@
 
 void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
                                      CreateProducerCallback callback) {
+    if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) {
+        throw std::invalid_argument("Batching and chunking of messages can't be enabled together");
+    }
     TopicNamePtr topicName;
     {
         Lock lock(mutex_);
@@ -174,7 +178,7 @@
             producer = std::make_shared<PartitionedProducerImpl>(shared_from_this(), topicName,
                                                                  partitionMetadata->getPartitions(), conf);
         } else {
-            producer = std::make_shared<ProducerImpl>(shared_from_this(), topicName->toString(), conf);
+            producer = std::make_shared<ProducerImpl>(shared_from_this(), *topicName, conf);
         }
         producer->getProducerCreatedFuture().addListener(
             std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1,
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 1094efb..472c15b 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -141,10 +141,8 @@
 }
 
 PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint64_t producerId,
-                                   uint64_t sequenceId, ChecksumType checksumType, const Message& msg) {
-    const proto::MessageMetadata& metadata = msg.impl_->metadata;
-    SharedBuffer& payload = msg.impl_->payload;
-
+                                   uint64_t sequenceId, ChecksumType checksumType,
+                                   const proto::MessageMetadata& metadata, const SharedBuffer& payload) {
     cmd.set_type(BaseCommand::SEND);
     CommandSend* send = cmd.mutable_send();
     send->set_producer_id(producerId);
@@ -152,6 +150,9 @@
     if (metadata.has_num_messages_in_batch()) {
         send->set_num_messages(metadata.num_messages_in_batch());
     }
+    if (metadata.has_chunk_id()) {
+        send->set_is_chunk(true);
+    }
 
     // / Wire format
     // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
@@ -199,7 +200,8 @@
         int metadataStartIndex = checksumReaderIndex + checksumSize;
         uint32_t metadataChecksum =
             computeChecksum(0, headers.data() + metadataStartIndex, (writeIndex - metadataStartIndex));
-        uint32_t computedChecksum = computeChecksum(metadataChecksum, payload.data(), payload.writerIndex());
+        uint32_t computedChecksum =
+            computeChecksum(metadataChecksum, payload.data(), payload.readableBytes());
         // set computed checksum
         headers.setWriterIndex(checksumReaderIndex);
         headers.writeUnsignedInt(computedChecksum);
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index e720571..bab2211 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -80,7 +80,8 @@
                                   const std::string& listenerName);
 
     static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId,
-                                    uint64_t sequenceId, ChecksumType checksumType, const Message& msg);
+                                    uint64_t sequenceId, ChecksumType checksumType,
+                                    const proto::MessageMetadata& metadata, const SharedBuffer& payload);
 
     static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
                                      uint64_t consumerId, uint64_t requestId,
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index b01e4e5..d13cb0e 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -231,4 +231,21 @@
 
 KeySharedPolicy ConsumerConfiguration::getKeySharedPolicy() const { return impl_->keySharedPolicy; }
 
+ConsumerConfiguration& ConsumerConfiguration::setMaxPendingChunkedMessage(size_t maxPendingChunkedMessage) {
+    impl_->maxPendingChunkedMessage = maxPendingChunkedMessage;
+    return *this;
+}
+
+size_t ConsumerConfiguration::getMaxPendingChunkedMessage() const { return impl_->maxPendingChunkedMessage; }
+
+ConsumerConfiguration& ConsumerConfiguration::setAutoOldestChunkedMessageOnQueueFull(
+    bool autoAckOldestChunkedMessageOnQueueFull) {
+    impl_->autoAckOldestChunkedMessageOnQueueFull = autoAckOldestChunkedMessageOnQueueFull;
+    return *this;
+}
+
+bool ConsumerConfiguration::isAutoOldestChunkedMessageOnQueueFull() const {
+    return impl_->autoAckOldestChunkedMessageOnQueueFull;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 75f65a7..9c2a461 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -50,6 +50,8 @@
     std::map<std::string, std::string> properties;
     int priorityLevel{0};
     KeySharedPolicy keySharedPolicy;
+    size_t maxPendingChunkedMessage{100};
+    bool autoAckOldestChunkedMessageOnQueueFull{false};
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 77c0fa9..4d4a135 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -63,7 +63,9 @@
       negativeAcksTracker_(client, *this, conf),
       ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
       readCompacted_(conf.isReadCompacted()),
-      lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
+      lastMessageInBroker_(Optional<MessageId>::of(MessageId())),
+      maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
+      autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoOldestChunkedMessageOnQueueFull()) {
     std::stringstream consumerStrStream;
     consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
     consumerStr_ = consumerStrStream.str();
@@ -308,6 +310,80 @@
     callback(result);
 }
 
+Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
+                                                         const proto::MessageMetadata& metadata,
+                                                         const MessageId& messageId,
+                                                         const proto::MessageIdData& messageIdData,
+                                                         const ClientConnectionPtr& cnx) {
+    const auto chunkId = metadata.chunk_id();
+    const auto uuid = metadata.uuid();
+    LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
+                                                 << ", messageId: " << messageId << ") of "
+                                                 << payload.readableBytes() << " bytes");
+
+    Lock lock(chunkProcessMutex_);
+    auto it = chunkedMessageCache_.find(uuid);
+
+    if (chunkId == 0) {
+        if (it == chunkedMessageCache_.end()) {
+            it = chunkedMessageCache_.putIfAbsent(
+                uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
+        }
+        if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) {
+            chunkedMessageCache_.removeOldestValues(
+                chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1,
+                [this, messageId](const std::string& uuid, const ChunkedMessageCtx& ctx) {
+                    if (autoAckOldestChunkedMessageOnQueueFull_) {
+                        doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) {
+                            if (result != ResultOk) {
+                                LOG_WARN("Failed to acknowledge discarded chunk, uuid: "
+                                         << uuid << ", messageId: " << messageId);
+                            }
+                        });
+                    } else {
+                        trackMessage(messageId);
+                    }
+                });
+            it = chunkedMessageCache_.putIfAbsent(
+                uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
+        }
+    }
+
+    auto& chunkedMsgCtx = it->second;
+    if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) {
+        if (it == chunkedMessageCache_.end()) {
+            LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId
+                                                           << ", messageId: " << messageId << ")");
+        } else {
+            LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: "
+                      << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")");
+            chunkedMessageCache_.remove(uuid);
+        }
+        lock.unlock();
+        increaseAvailablePermits(cnx);
+        trackMessage(messageId);
+        return Optional<SharedBuffer>::empty();
+    }
+
+    chunkedMsgCtx.appendChunk(messageId, payload);
+    if (!chunkedMsgCtx.isCompleted()) {
+        lock.unlock();
+        increaseAvailablePermits(cnx);
+        return Optional<SharedBuffer>::empty();
+    }
+
+    LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
+                                                    << ", sequenceId: " << metadata.sequence_id());
+
+    auto wholePayload = chunkedMsgCtx.getBuffer();
+    chunkedMessageCache_.remove(uuid);
+    if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
+        return Optional<SharedBuffer>::of(wholePayload);
+    } else {
+        return Optional<SharedBuffer>::empty();
+    }
+}
+
 void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
                                    bool& isChecksumValid, proto::MessageMetadata& metadata,
                                    SharedBuffer& payload) {
@@ -318,17 +394,39 @@
         return;
     }
 
-    if (!uncompressMessageIfNeeded(cnx, msg, metadata, payload)) {
-        // Message was discarded on decompression error
-        return;
-    }
-
     if (!isChecksumValid) {
         // Message discarded for checksum error
         discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::ChecksumMismatch);
         return;
     }
 
+    const bool isMessageDecryptable =
+        metadata.encryption_keys_size() <= 0 || config_.getCryptoKeyReader().get() ||
+        config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME;
+
+    const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1 &&
+                                  config_.getConsumerType() != ConsumerType::ConsumerShared &&
+                                  config_.getConsumerType() != ConsumerType::ConsumerKeyShared;
+    if (isMessageDecryptable && !isChunkedMessage) {
+        if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) {
+            // Message was discarded on decompression error
+            return;
+        }
+    }
+
+    // Only a non-batched messages can be a chunk
+    if (!metadata.has_num_messages_in_batch() && isChunkedMessage) {
+        const auto& messageIdData = msg.message_id();
+        MessageId messageId(messageIdData.partition(), messageIdData.ledgerid(), messageIdData.entryid(),
+                            messageIdData.batch_index());
+        auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
+        if (optionalPayload.is_present()) {
+            payload = optionalPayload.value();
+        } else {
+            return;
+        }
+    }
+
     Message m(msg, metadata, payload, partitionIndex_);
     m.impl_->cnx_ = cnx.get();
     m.impl_->setTopicName(topic_);
@@ -528,8 +626,10 @@
     return false;
 }
 
-bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
-                                             const proto::MessageMetadata& metadata, SharedBuffer& payload) {
+bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,
+                                             const proto::MessageIdData& messageIdData,
+                                             const proto::MessageMetadata& metadata, SharedBuffer& payload,
+                                             bool checkMaxMessageSize) {
     if (!metadata.has_compression()) {
         return true;
     }
@@ -539,12 +639,11 @@
     uint32_t uncompressedSize = metadata.uncompressed_size();
     uint32_t payloadSize = payload.readableBytes();
     if (cnx) {
-        if (payloadSize > ClientConnection::getMaxMessageSize()) {
+        if (checkMaxMessageSize && payloadSize > ClientConnection::getMaxMessageSize()) {
             // Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize
             LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize  //
-                                << " at  " << msg.message_id().ledgerid() << ":"
-                                << msg.message_id().entryid());
-            discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::UncompressedSizeCorruption);
+                                << " at  " << messageIdData.ledgerid() << ":" << messageIdData.entryid());
+            discardCorruptedMessage(cnx, messageIdData, proto::CommandAck::UncompressedSizeCorruption);
             return false;
         }
     } else {
@@ -554,8 +653,8 @@
 
     if (!CompressionCodecProvider::getCodec(compressionType).decode(payload, uncompressedSize, payload)) {
         LOG_ERROR(getName() << "Failed to decompress message with " << uncompressedSize  //
-                            << " at  " << msg.message_id().ledgerid() << ":" << msg.message_id().entryid());
-        discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::DecompressionError);
+                            << " at  " << messageIdData.ledgerid() << ":" << messageIdData.entryid());
+        discardCorruptedMessage(cnx, messageIdData, proto::CommandAck::DecompressionError);
         return false;
     }
 
@@ -584,7 +683,7 @@
         // This will only happen when the connection got reset and we cleared the queue
         return;
     }
-    trackMessage(msg);
+    trackMessage(msg.getMessageId());
     try {
         consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
         lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
@@ -732,7 +831,7 @@
 
     increaseAvailablePermits(currentCnx);
     if (track) {
-        trackMessage(msg);
+        trackMessage(msg.getMessageId());
     }
 }
 
@@ -1240,11 +1339,11 @@
     negativeAcksTracker_.setEnabledForTesting(enabled);
 }
 
-void ConsumerImpl::trackMessage(const Message& msg) {
+void ConsumerImpl::trackMessage(const MessageId& messageId) {
     if (hasParent_) {
-        unAckedMessageTrackerPtr_->remove(msg.getMessageId());
+        unAckedMessageTrackerPtr_->remove(messageId);
     } else {
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        unAckedMessageTrackerPtr_->add(messageId);
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 0754a89..2bdb82f 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -40,6 +40,7 @@
 #include "BatchAcknowledgementTracker.h"
 #include <limits>
 #include <lib/BrokerConsumerStatsImpl.h>
+#include <lib/MapCache.h>
 #include <lib/stats/ConsumerStatsImpl.h>
 #include <lib/stats/ConsumerStatsDisabled.h>
 #include <queue>
@@ -157,8 +158,9 @@
 
    private:
     bool waitingForZeroQueueSizeMessage;
-    bool uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
-                                   const proto::MessageMetadata& metadata, SharedBuffer& payload);
+    bool uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageIdData,
+                                   const proto::MessageMetadata& metadata, SharedBuffer& payload,
+                                   bool checkMaxMessageSize);
     void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
                                  proto::CommandAck::ValidationError validationError);
     void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
@@ -177,7 +179,7 @@
     void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
     void failPendingReceiveCallback();
     void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
-    void trackMessage(const Message& msg);
+    void trackMessage(const MessageId& messageId);
 
     Optional<MessageId> clearReceiveQueue();
 
@@ -227,6 +229,83 @@
         return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId::earliest();
     }
 
+    class ChunkedMessageCtx {
+       public:
+        ChunkedMessageCtx() : totalChunks_(0) {}
+        ChunkedMessageCtx(int totalChunks, int totalChunkMessageSize)
+            : totalChunks_(totalChunks), chunkedMsgBuffer_(SharedBuffer::allocate(totalChunkMessageSize)) {
+            chunkedMessageIds_.reserve(totalChunks);
+        }
+
+        ChunkedMessageCtx(const ChunkedMessageCtx&) = delete;
+        // Here we don't use =default to be compatible with GCC 4.8
+        ChunkedMessageCtx(ChunkedMessageCtx&& rhs) noexcept
+            : totalChunks_(rhs.totalChunks_),
+              chunkedMsgBuffer_(std::move(rhs.chunkedMsgBuffer_)),
+              chunkedMessageIds_(std::move(rhs.chunkedMessageIds_)) {}
+
+        bool validateChunkId(int chunkId) const noexcept { return chunkId == numChunks(); }
+
+        void appendChunk(const MessageId& messageId, const SharedBuffer& payload) {
+            chunkedMessageIds_.emplace_back(messageId);
+            chunkedMsgBuffer_.write(payload.data(), payload.readableBytes());
+        }
+
+        bool isCompleted() const noexcept { return totalChunks_ == numChunks(); }
+
+        const SharedBuffer& getBuffer() const noexcept { return chunkedMsgBuffer_; }
+
+        const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
+
+        friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) {
+            return os << "ChunkedMessageCtx " << ctx.chunkedMsgBuffer_.readableBytes() << " of "
+                      << ctx.chunkedMsgBuffer_.writerIndex() << " bytes, " << ctx.numChunks() << " of "
+                      << ctx.totalChunks_ << " chunks";
+        }
+
+       private:
+        const int totalChunks_;
+        SharedBuffer chunkedMsgBuffer_;
+        std::vector<MessageId> chunkedMessageIds_;
+
+        int numChunks() const noexcept { return static_cast<int>(chunkedMessageIds_.size()); }
+    };
+
+    const size_t maxPendingChunkedMessage_;
+    // if queue size is reasonable (most of the time equal to number of producers try to publish messages
+    // concurrently on the topic) then it guards against broken chunked message which was not fully published
+    const bool autoAckOldestChunkedMessageOnQueueFull_;
+
+    // The key is UUID, value is the associated ChunkedMessageCtx of the chunked message.
+    std::unordered_map<std::string, ChunkedMessageCtx> chunkedMessagesMap_;
+    // This list contains all the keys of `chunkedMessagesMap_`, each key is an UUID that identifies a pending
+    // chunked message. Once the number of pending chunked messages exceeds the limit, the oldest UUIDs and
+    // the associated ChunkedMessageCtx will be removed.
+    std::list<std::string> pendingChunkedMessageUuidQueue_;
+
+    // The key is UUID, value is the associated ChunkedMessageCtx of the chunked message.
+    MapCache<std::string, ChunkedMessageCtx> chunkedMessageCache_;
+    mutable std::mutex chunkProcessMutex_;
+
+    /**
+     * Process a chunk. If the chunk is the last chunk of a message, concatenate all buffered chunks into the
+     * payload and return it.
+     *
+     * @param payload the payload of a chunk
+     * @param metadata the message metadata
+     * @param messageId
+     * @param messageIdData
+     * @param cnx
+     *
+     * @return the concatenated payload if chunks are concatenated into a completed message payload
+     *   successfully, else Optional::empty()
+     */
+    Optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
+                                               const proto::MessageMetadata& metadata,
+                                               const MessageId& messageId,
+                                               const proto::MessageIdData& messageIdData,
+                                               const ClientConnectionPtr& cnx);
+
     friend class PulsarFriend;
 
     // these two declared friend to access setNegativeAcknowledgeEnabledForTesting
diff --git a/pulsar-client-cpp/lib/MapCache.h b/pulsar-client-cpp/lib/MapCache.h
new file mode 100644
index 0000000..b9a0069
--- /dev/null
+++ b/pulsar-client-cpp/lib/MapCache.h
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <deque>
+#include <functional>
+#include <unordered_map>
+#include <vector>
+
+namespace pulsar {
+
+// A map cache that supports removing the first N oldest entries from the map.
+// Value must be moveable and have the default constructor.
+template <typename Key, typename Value>
+class MapCache {
+    std::unordered_map<Key, Value> map_;
+    std::deque<Key> keys_;
+
+   public:
+    using const_iterator = typename decltype(map_)::const_iterator;
+    using iterator = typename decltype(map_)::iterator;
+
+    MapCache() = default;
+    // Here we don't use =default to be compatible with GCC 4.8
+    MapCache(MapCache&& rhs) noexcept : map_(std::move(rhs.map_)), keys_(std::move(rhs.keys_)) {}
+
+    size_t size() const noexcept { return map_.size(); }
+
+    const_iterator find(const Key& key) const { return map_.find(key); }
+    iterator find(const Key& key) { return map_.find(key); }
+
+    const_iterator end() const noexcept { return map_.end(); }
+    iterator end() noexcept { return map_.end(); }
+
+    iterator putIfAbsent(const Key& key, Value&& value) {
+        auto it = map_.find(key);
+        if (it == map_.end()) {
+            keys_.push_back(key);
+            return map_.emplace(key, std::move(value)).first;
+        } else {
+            return end();
+        }
+    }
+
+    void removeOldestValues(size_t numToRemove,
+                            const std::function<void(const Key&, const Value&)>& callback) {
+        for (size_t i = 0; !keys_.empty() && i < numToRemove; i++) {
+            const auto key = keys_.front();
+            auto it = map_.find(key);
+            if (it != map_.end()) {
+                if (callback) {
+                    callback(it->first, it->second);
+                }
+                map_.erase(it);
+            }
+            keys_.pop_front();
+        }
+    }
+
+    void remove(const Key& key) {
+        auto it = map_.find(key);
+        if (it != map_.end()) {
+            removeKeyFromKeys(key);
+            map_.erase(it);
+        }
+    }
+
+    // Following methods are only used for tests
+    std::vector<Key> getKeys() const {
+        std::vector<Key> keys;
+        for (auto key : keys_) {
+            keys.emplace_back(key);
+        }
+        return keys;
+    }
+
+   private:
+    void removeKeyFromKeys(const Key& key) {
+        for (auto it = keys_.begin(); it != keys_.end(); ++it) {
+            if (*it == key) {
+                keys_.erase(it);
+                break;
+            }
+        }
+    }
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MemoryLimitController.cc b/pulsar-client-cpp/lib/MemoryLimitController.cc
index 81578c9..4a23f8b 100644
--- a/pulsar-client-cpp/lib/MemoryLimitController.cc
+++ b/pulsar-client-cpp/lib/MemoryLimitController.cc
@@ -25,6 +25,10 @@
     : memoryLimit_(memoryLimit), currentUsage_(0), mutex_(), condition_() {}
 
 bool MemoryLimitController::tryReserveMemory(uint64_t size) {
+    // Avoid CAS operation when size is 0
+    if (size == 0) {
+        return true;
+    }
     while (true) {
         uint64_t current = currentUsage_;
         uint64_t newUsage = current + size;
@@ -66,4 +70,4 @@
 
 uint64_t MemoryLimitController::currentUsage() const { return currentUsage_; }
 
-}  // namespace pulsar
\ No newline at end of file
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/OpSendMsg.h b/pulsar-client-cpp/lib/OpSendMsg.h
index 70b880c..365301b 100644
--- a/pulsar-client-cpp/lib/OpSendMsg.h
+++ b/pulsar-client-cpp/lib/OpSendMsg.h
@@ -29,7 +29,8 @@
 namespace pulsar {
 
 struct OpSendMsg {
-    Message msg_;
+    proto::MessageMetadata metadata_;
+    SharedBuffer payload_;
     SendCallback sendCallback_;
     uint64_t producerId_;
     uint64_t sequenceId_;
@@ -39,15 +40,24 @@
 
     OpSendMsg() = default;
 
-    OpSendMsg(const Message& msg, const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId,
-              int sendTimeoutMs, uint32_t messagesCount, uint64_t messagesSize)
-        : msg_(msg),
+    OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& payload,
+              const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId, int sendTimeoutMs,
+              uint32_t messagesCount, uint64_t messagesSize)
+        : metadata_(metadata),  // the copy happens here because OpSendMsg of chunks are constructed with the
+                                // a shared metadata object
+          payload_(payload),
           sendCallback_(sendCallback),
           producerId_(producerId),
           sequenceId_(sequenceId),
           timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)),
           messagesCount_(messagesCount),
           messagesSize_(messagesSize) {}
+
+    void complete(Result result, const MessageId& messageId) const {
+        if (sendCallback_) {
+            sendCallback_(result, messageId);
+        }
+    }
 };
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index bdd23ed..4f8be3b 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -86,8 +86,7 @@
 
 ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
     using namespace std::placeholders;
-    std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
-    auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
+    auto producer = std::make_shared<ProducerImpl>(client_, *topicName_, conf_, partition);
 
     if (lazy) {
         createLazyPartitionProducer(partition);
@@ -97,7 +96,7 @@
                       const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
     }
 
-    LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
+    LOG_DEBUG("Creating Producer for single Partition - " << topicName_ << "-partition-" << partition);
     return producer;
 }
 
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 3e027ee..0ee38ca 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -251,4 +251,11 @@
     return *this;
 }
 
+ProducerConfiguration& ProducerConfiguration::setChunkingEnabled(bool chunkingEnabled) {
+    impl_->chunkingEnabled = chunkingEnabled;
+    return *this;
+}
+
+bool ProducerConfiguration::isChunkingEnabled() const { return impl_->chunkingEnabled; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index a41b250..2ac1eba 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -48,6 +48,7 @@
     std::set<std::string> encryptionKeys;
     ProducerCryptoFailureAction cryptoFailureAction{ProducerCryptoFailureAction::FAIL};
     std::map<std::string, std::string> properties;
+    bool chunkingEnabled{false};
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index f81e205..87f8552 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -37,15 +37,15 @@
 
     void complete(Result result) {
         for (const auto& opSendMsg : opSendMsgs) {
-            opSendMsg.sendCallback_(result, opSendMsg.msg_.getMessageId());
+            opSendMsg.complete(result, {});
         }
     }
 };
 
-ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf,
-                           int32_t partition)
+ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
+                           const ProducerConfiguration& conf, int32_t partition)
     : HandlerBase(
-          client, topic,
+          client, (partition < 0) ? topicName.toString() : topicName.getTopicPartitionName(partition),
           Backoff(milliseconds(100), seconds(60), milliseconds(std::max(100, conf.getSendTimeout() - 100)))),
       conf_(conf),
       semaphore_(),
@@ -57,7 +57,8 @@
       producerId_(client->newProducerId()),
       msgSequenceGenerator_(0),
       dataKeyGenIntervalSec_(4 * 60 * 60),
-      memoryLimitController_(client->getMemoryLimitController()) {
+      memoryLimitController_(client->getMemoryLimitController()),
+      chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()) {
     LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
                                 << " id: " << producerId_);
 
@@ -323,14 +324,6 @@
     }
 }
 
-void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, SendCallback callback,
-                                        boost::posix_time::ptime publishTime) {
-    producerStatsBasePtr_->messageReceived(res, publishTime);
-    if (callback) {
-        callback(res, msgId);
-    }
-}
-
 void ProducerImpl::flushAsync(FlushCallback callback) {
     if (batchMessageContainer_) {
         Lock lock(mutex_);
@@ -358,44 +351,61 @@
     }
 }
 
+bool ProducerImpl::isValidProducerState(const SendCallback& callback) const {
+    Lock lock(mutex_);
+    const auto state = state_;
+    lock.unlock();
+    switch (state) {
+        case HandlerBase::Ready:
+            // OK
+        case HandlerBase::Pending:
+            // We are OK to queue the messages on the client, it will be sent to the broker once we get the
+            // connection
+            return true;
+        case HandlerBase::Closing:
+        case HandlerBase::Closed:
+            callback(ResultAlreadyClosed, {});
+            return false;
+        case HandlerBase::NotStarted:
+        case HandlerBase::Failed:
+        default:
+            callback(ResultNotConnected, {});
+            return false;
+    }
+}
+
+bool ProducerImpl::canAddToBatch(const Message& msg) const {
+    // If a message has a delayed delivery time, we'll always send it individually
+    return batchMessageContainer_.get() && !msg.impl_->metadata.has_deliver_at_time();
+}
+
+static SharedBuffer applyCompression(const SharedBuffer& uncompressedPayload,
+                                     CompressionType compressionType) {
+    return CompressionCodecProvider::getCodec(compressionType).encode(uncompressedPayload);
+}
+
 void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
     producerStatsBasePtr_->messageSent(msg);
-    SendCallback cb =
-        std::bind(&ProducerImpl::statsCallBackHandler, shared_from_this(), std::placeholders::_1,
-                  std::placeholders::_2, callback, boost::posix_time::microsec_clock::universal_time());
 
-    // Compress the payload if required
-    SharedBuffer& payload = msg.impl_->payload;
-
-    uint32_t uncompressedSize = payload.readableBytes();
-    uint32_t payloadSize = uncompressedSize;
-    ClientConnectionPtr cnx = getCnx().lock();
-    if (!batchMessageContainer_) {
-        // If batching is enabled we compress all the payloads together before sending the batch
-        payload = CompressionCodecProvider::getCodec(conf_.getCompressionType()).encode(payload);
-        payloadSize = payload.readableBytes();
-
-        // Encrypt the payload if enabled
-        SharedBuffer encryptedPayload;
-        if (!encryptMessage(msg.impl_->metadata, payload, encryptedPayload)) {
-            cb(ResultCryptoError, msg.getMessageId());
-            return;
+    const auto now = boost::posix_time::microsec_clock::universal_time();
+    auto self = shared_from_this();
+    sendAsyncWithStatsUpdate(msg, [this, self, now, callback](Result result, const MessageId& messageId) {
+        producerStatsBasePtr_->messageReceived(result, now);
+        if (callback) {
+            callback(result, messageId);
         }
-        payload = encryptedPayload;
+    });
+}
 
-        if (payloadSize > ClientConnection::getMaxMessageSize()) {
-            LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed "
-                                << ClientConnection::getMaxMessageSize() << " bytes");
-            cb(ResultMessageTooBig, msg.getMessageId());
-            return;
-        }
+void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& callback) {
+    if (!isValidProducerState(callback)) {
+        return;
     }
 
-    // Reserve a spot in the messages queue before acquiring the ProducerImpl
-    // mutex. When the queue is full, this call will block until a spot is
-    // available.
-    Result res = canEnqueueRequest(payloadSize);
-    if (res != ResultOk) {
+    const auto& uncompressedPayload = msg.impl_->payload;
+    const uint32_t uncompressedSize = uncompressedPayload.readableBytes();
+    const auto result = canEnqueueRequest(uncompressedSize);
+    if (result != ResultOk) {
         // If queue is full sending the batch immediately, no point waiting till batchMessagetimeout
         if (batchMessageContainer_) {
             LOG_DEBUG(getName() << " - sending batch message immediately");
@@ -405,43 +415,64 @@
             failures.complete();
         }
 
-        cb(res, msg.getMessageId());
+        callback(result, {});
         return;
     }
 
+    // We have already reserved a spot, so if we need to early return for failed result, we should release the
+    // semaphore and memory first.
+    const auto handleFailedResult = [this, uncompressedSize, callback](Result result) {
+        releaseSemaphore(uncompressedSize);  // it releases the memory as well
+        callback(result, {});
+    };
+
+    const bool compressed = !canAddToBatch(msg);
+    const auto payload =
+        compressed ? applyCompression(uncompressedPayload, conf_.getCompressionType()) : uncompressedPayload;
+    const auto compressedSize = static_cast<uint32_t>(payload.readableBytes());
+    const auto maxMessageSize = static_cast<uint32_t>(ClientConnection::getMaxMessageSize());
+
+    if (compressed && compressedSize > ClientConnection::getMaxMessageSize() && !chunkingEnabled_) {
+        LOG_WARN(getName() << " - compressed Message payload size " << payload.readableBytes()
+                           << " cannot exceed " << ClientConnection::getMaxMessageSize()
+                           << " bytes unless chunking is enabled");
+        handleFailedResult(ResultMessageTooBig);
+        return;
+    }
+
+    auto& msgMetadata = msg.impl_->metadata;
+    if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) {
+        handleFailedResult(ResultInvalidMessage);
+        return;
+    }
+
+    const int totalChunks =
+        canAddToBatch(msg) ? 1 : getNumOfChunks(compressedSize, ClientConnection::getMaxMessageSize());
+    // Each chunk should be sent individually, so try to acquire extra permits for chunks.
+    for (int i = 0; i < (totalChunks - 1); i++) {
+        const auto result = canEnqueueRequest(0);  // size is 0 because the memory has already reserved
+        if (result != ResultOk) {
+            handleFailedResult(result);
+            return;
+        }
+    }
+
     Lock lock(mutex_);
-    // producers may be lazily starting and be in the pending state
-    if (state_ != Ready && state_ != Pending) {
-        lock.unlock();
-        releaseSemaphore(payloadSize);
-        cb(ResultAlreadyClosed, msg.getMessageId());
-        return;
-    }
-
-    if (msg.impl_->metadata.has_producer_name()) {
-        // Message had already been sent before
-        lock.unlock();
-        releaseSemaphore(payloadSize);
-        cb(ResultInvalidMessage, msg.getMessageId());
-        return;
-    }
-
     uint64_t sequenceId;
-    if (!msg.impl_->metadata.has_sequence_id()) {
+    if (!msgMetadata.has_sequence_id()) {
         sequenceId = msgSequenceGenerator_++;
     } else {
-        sequenceId = msg.impl_->metadata.sequence_id();
+        sequenceId = msgMetadata.sequence_id();
     }
     setMessageMetadata(msg, sequenceId, uncompressedSize);
 
-    // If we reach this point then you have a reserved spot on the queue
-    if (batchMessageContainer_ && !msg.impl_->metadata.has_deliver_at_time()) {
+    if (canAddToBatch(msg)) {
         // Batching is enabled and the message is not delayed
         if (!batchMessageContainer_->hasEnoughSpace(msg)) {
             batchMessageAndSend().complete();
         }
         bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
-        bool isFull = batchMessageContainer_->add(msg, cb);
+        bool isFull = batchMessageContainer_->add(msg, callback);
         if (isFirstMessage) {
             batchTimer_->expires_from_now(
                 boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
@@ -455,10 +486,42 @@
             failures.complete();
         }
     } else {
-        sendMessage(OpSendMsg{msg, cb, producerId_, sequenceId, conf_.getSendTimeout(), 1, payloadSize});
+        const bool sendChunks = (totalChunks > 1);
+        if (sendChunks) {
+            msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId));
+            msgMetadata.set_num_chunks_from_msg(totalChunks);
+            msgMetadata.set_total_chunk_msg_size(compressedSize);
+        }
+
+        int beginIndex = 0;
+        for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+            if (sendChunks) {
+                msgMetadata.set_chunk_id(chunkId);
+            }
+            const uint32_t endIndex = std::min(compressedSize, beginIndex + maxMessageSize);
+            auto chunkedPayload = payload.slice(beginIndex, endIndex - beginIndex);
+            beginIndex = endIndex;
+
+            SharedBuffer encryptedPayload;
+            if (!encryptMessage(msgMetadata, chunkedPayload, encryptedPayload)) {
+                handleFailedResult(ResultCryptoError);
+                return;
+            }
+
+            sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
+                                  (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId,
+                                  conf_.getSendTimeout(), 1, uncompressedSize});
+        }
     }
 }
 
+int ProducerImpl::getNumOfChunks(uint32_t size, uint32_t maxMessageSize) {
+    if (size >= maxMessageSize && maxMessageSize != 0) {
+        return size / maxMessageSize + ((size % maxMessageSize == 0) ? 0 : 1);
+    }
+    return 1;
+}
+
 Result ProducerImpl::canEnqueueRequest(uint32_t payloadSize) {
     if (conf_.getBlockIfQueueFull()) {
         if (semaphore_) {
@@ -523,7 +586,7 @@
                 // we need to release the spot manually
                 LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: " << result);
                 releaseSemaphoreForSendOp(opSendMsg);
-                failures.add(std::bind(opSendMsg.sendCallback_, result, MessageId{}));
+                failures.add([opSendMsg, result] { opSendMsg.complete(result, {}); });
             }
         } else if (numBatches > 1) {
             std::vector<OpSendMsg> opSendMsgs;
@@ -537,7 +600,9 @@
                     LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsgs[" << i
                                                                                   << "]: " << results[i]);
                     releaseSemaphoreForSendOp(opSendMsgs[i]);
-                    failures.add(std::bind(opSendMsgs[i].sendCallback_, results[i], MessageId{}));
+                    const auto& opSendMsg = opSendMsgs[i];
+                    const auto result = results[i];
+                    failures.add([opSendMsg, result] { opSendMsg.complete(result, {}); });
                 }
             }
         }  // else numBatches is 0, do nothing
@@ -551,7 +616,7 @@
 // a. we have a reserved spot on the queue
 // b. call this function after acquiring the ProducerImpl mutex_
 void ProducerImpl::sendMessage(const OpSendMsg& op) {
-    const auto sequenceId = op.msg_.impl_->metadata.sequence_id();
+    const auto sequenceId = op.metadata_.sequence_id();
     LOG_DEBUG("Inserting data to pendingMessagesQueue_");
     pendingMessagesQueue_.push_back(op);
 
@@ -748,13 +813,11 @@
         LOG_DEBUG(getName() << "Remove corrupt message from queue " << sequenceId);
         pendingMessagesQueue_.pop_front();
         lock.unlock();
-        if (op.sendCallback_) {
+        try {
             // to protect from client callback exception
-            try {
-                op.sendCallback_(ResultChecksumError, op.msg_.getMessageId());
-            } catch (const std::exception& e) {
-                LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
-            }
+            op.complete(ResultChecksumError, {});
+        } catch (const std::exception& e) {
+            LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
         }
         releaseSemaphoreForSendOp(op);
         return true;
@@ -795,12 +858,10 @@
         pendingMessagesQueue_.pop_front();
 
         lock.unlock();
-        if (op.sendCallback_) {
-            try {
-                op.sendCallback_(ResultOk, messageId);
-            } catch (const std::exception& e) {
-                LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
-            }
+        try {
+            op.complete(ResultOk, messageId);
+        } catch (const std::exception& e) {
+            LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
         }
         return true;
     }
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index d29efed..3dccf91 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -47,12 +47,13 @@
 
 class Producer;
 class MemoryLimitController;
+class TopicName;
 
 class ProducerImpl : public HandlerBase,
                      public std::enable_shared_from_this<ProducerImpl>,
                      public ProducerImplBase {
    public:
-    ProducerImpl(ClientImplPtr client, const std::string& topic,
+    ProducerImpl(ClientImplPtr client, const TopicName& topic,
                  const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
     ~ProducerImpl();
 
@@ -83,6 +84,8 @@
 
     int32_t partition() const noexcept { return partition_; }
 
+    static int getNumOfChunks(uint32_t size, uint32_t maxMessageSize);
+
    protected:
     ProducerStatsBasePtr producerStatsBasePtr_;
 
@@ -115,8 +118,6 @@
     void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
                               const ResponseData& responseData);
 
-    void statsCallBackHandler(Result, const MessageId&, SendCallback, boost::posix_time::ptime);
-
     void handleClose(Result result, ResultCallback callback, ProducerImplPtr producer);
 
     void resendMessages(ClientConnectionPtr cnx);
@@ -125,12 +126,26 @@
     bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
                         SharedBuffer& encryptedPayload);
 
+    void sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& callback);
+
+    /**
+     * Reserve a spot in the messages queue before acquiring the ProducerImpl mutex. When the queue is full,
+     * this call will block until a spot is available if blockIfQueueIsFull is true. Otherwise, it will return
+     * ResultProducerQueueIsFull immediately.
+     *
+     * It also checks whether the memory could reach the limit after `payloadSize` is added. If so, this call
+     * will block until enough memory could be retained.
+     */
     Result canEnqueueRequest(uint32_t payloadSize);
+
     void releaseSemaphore(uint32_t payloadSize);
     void releaseSemaphoreForSendOp(const OpSendMsg& op);
 
     void cancelTimers();
 
+    bool isValidProducerState(const SendCallback& callback) const;
+    bool canAddToBatch(const Message& msg) const;
+
     typedef std::unique_lock<std::mutex> Lock;
 
     ProducerConfiguration conf_;
@@ -138,7 +153,7 @@
     std::unique_ptr<Semaphore> semaphore_;
     MessageQueue pendingMessagesQueue_;
 
-    int32_t partition_;  // -1 if topic is non-partitioned
+    const int32_t partition_;  // -1 if topic is non-partitioned
     std::string producerName_;
     bool userProvidedProducerName_;
     std::string producerStr_;
@@ -169,6 +184,7 @@
     uint32_t dataKeyGenIntervalSec_;
 
     MemoryLimitController& memoryLimitController_;
+    const bool chunkingEnabled_;
 };
 
 struct ProducerImplCmp {
diff --git a/pulsar-client-cpp/lib/SharedBuffer.h b/pulsar-client-cpp/lib/SharedBuffer.h
index d544299..be889a7 100644
--- a/pulsar-client-cpp/lib/SharedBuffer.h
+++ b/pulsar-client-cpp/lib/SharedBuffer.h
@@ -94,13 +94,13 @@
     /**
      * Return a shared buffer that include a portion of current buffer. No memory is copied
      */
-    SharedBuffer slice(uint32_t offset) {
+    SharedBuffer slice(uint32_t offset) const {
         SharedBuffer buf(*this);
         buf.consume(offset);
         return buf;
     }
 
-    SharedBuffer slice(uint32_t offset, uint32_t length) {
+    SharedBuffer slice(uint32_t offset, uint32_t length) const {
         SharedBuffer buf(*this);
         buf.consume(offset);
         assert(buf.readableBytes() >= length);
@@ -165,7 +165,7 @@
     }
 
     // Return current writer index
-    uint32_t writerIndex() { return writeIdx_; }
+    uint32_t writerIndex() const noexcept { return writeIdx_; }
 
     // skip writerIndex
     void skipBytes(uint32_t size) {
@@ -180,7 +180,7 @@
     }
 
     // Return current reader index
-    uint32_t readerIndex() { return readIdx_; }
+    uint32_t readerIndex() const noexcept { return readIdx_; }
 
     // set readerIndex
     void setReaderIndex(uint32_t index) {
diff --git a/pulsar-client-cpp/lib/TopicName.cc b/pulsar-client-cpp/lib/TopicName.cc
index a56b979..2e6232c 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -216,7 +216,7 @@
     return ss.str();
 }
 
-std::string TopicName::toString() {
+std::string TopicName::toString() const {
     std::stringstream ss;
     std::string seperator("/");
     if (isV2Topic_ && cluster_.empty()) {
@@ -230,7 +230,7 @@
 
 bool TopicName::isPersistent() const { return this->domain_ == TopicDomain::Persistent; }
 
-const std::string TopicName::getTopicPartitionName(unsigned int partition) {
+std::string TopicName::getTopicPartitionName(unsigned int partition) const {
     std::stringstream topicPartitionName;
     // make this topic name as well
     topicPartitionName << toString() << PartitionedProducerImpl::PARTITION_NAME_SUFFIX << partition;
diff --git a/pulsar-client-cpp/lib/TopicName.h b/pulsar-client-cpp/lib/TopicName.h
index 248001c..1d5deab 100644
--- a/pulsar-client-cpp/lib/TopicName.h
+++ b/pulsar-client-cpp/lib/TopicName.h
@@ -55,14 +55,14 @@
     std::string getNamespacePortion();
     std::string getLocalName();
     std::string getEncodedLocalName();
-    std::string toString();
+    std::string toString() const;
     bool isPersistent() const;
     NamespaceNamePtr getNamespaceName();
     int getPartitionIndex() const noexcept { return partition_; }
     static std::shared_ptr<TopicName> get(const std::string& topicName);
     bool operator==(const TopicName& other);
     static std::string getEncodedName(const std::string& nameBeforeEncoding);
-    const std::string getTopicPartitionName(unsigned int partition);
+    std::string getTopicPartitionName(unsigned int partition) const;
     static int getPartitionIndex(const std::string& topic);
 
    private:
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsBase.h b/pulsar-client-cpp/lib/stats/ProducerStatsBase.h
index 494303b..0ae16d1 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsBase.h
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsBase.h
@@ -27,7 +27,7 @@
 class ProducerStatsBase {
    public:
     virtual void messageSent(const Message& msg) = 0;
-    virtual void messageReceived(Result&, boost::posix_time::ptime&) = 0;
+    virtual void messageReceived(Result, const boost::posix_time::ptime&) = 0;
     virtual ~ProducerStatsBase(){};
 };
 
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsDisabled.h b/pulsar-client-cpp/lib/stats/ProducerStatsDisabled.h
index f81c8aa..6568c07 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsDisabled.h
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsDisabled.h
@@ -25,7 +25,7 @@
 class ProducerStatsDisabled : public ProducerStatsBase {
    public:
     virtual void messageSent(const Message& msg){};
-    virtual void messageReceived(Result&, boost::posix_time::ptime&){};
+    virtual void messageReceived(Result, const boost::posix_time::ptime&){};
 };
 }  // namespace pulsar
 #endif  // PULSAR_PRODUCER_STATS_DISABLED_HEADER
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
index af7ae4b..811c0e1 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
@@ -93,7 +93,7 @@
     totalBytesSent_ += msg.getLength();
 }
 
-void ProducerStatsImpl::messageReceived(Result& res, boost::posix_time::ptime& publishTime) {
+void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::ptime& publishTime) {
     boost::posix_time::ptime currentTime = boost::posix_time::microsec_clock::universal_time();
     double diffInMicros = (currentTime - publishTime).total_microseconds();
     Lock lock(mutex_);
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
index e82628b..27ffacc 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
@@ -82,7 +82,7 @@
 
     void messageSent(const Message&);
 
-    void messageReceived(Result&, boost::posix_time::ptime&);
+    void messageReceived(Result, const boost::posix_time::ptime&);
 
     ~ProducerStatsImpl();
 
diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf b/pulsar-client-cpp/test-conf/standalone-ssl.conf
index 90c4822..5c94e58 100644
--- a/pulsar-client-cpp/test-conf/standalone-ssl.conf
+++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf
@@ -300,4 +300,7 @@
 globalZookeeperServers={{ zookeeper_servers }}
 
 # Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds
-brokerServicePurgeInactiveFrequencyInSeconds=60
\ No newline at end of file
+brokerServicePurgeInactiveFrequencyInSeconds=60
+
+# Given a specific limit of the max message size
+maxMessageSize=1024000
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index be65775..da5c609 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -602,7 +602,7 @@
     Result result = client.createProducer(topicName, conf, producer);
     ASSERT_EQ(ResultOk, result);
 
-    int size = Commands::DefaultMaxMessageSize + 1000 * 100;
+    int size = ClientConnection::getMaxMessageSize() + 1000 * 100;
     char *content = new char[size];
     memset(content, 0, size);
     Message msg = MessageBuilder().setAllocatedContent(content, size).build();
@@ -610,7 +610,7 @@
     ASSERT_EQ(ResultMessageTooBig, result);
 
     // Anything up to MaxMessageSize should be allowed
-    size = Commands::DefaultMaxMessageSize;
+    size = ClientConnection::getMaxMessageSize();
     msg = MessageBuilder().setAllocatedContent(content, size).build();
     result = producer.send(msg);
     ASSERT_EQ(ResultOk, result);
@@ -1156,7 +1156,7 @@
     result = producerFuture.get(producer2);
     ASSERT_EQ(ResultOk, result);
 
-    int size = Commands::DefaultMaxMessageSize + 1000 * 100;
+    int size = ClientConnection::getMaxMessageSize() + 1000 * 100;
     char *content = new char[size];
     memset(content, 0, size);
     Message msg = MessageBuilder().setAllocatedContent(content, size).build();
@@ -1208,7 +1208,7 @@
     result = client.createProducer(topicName, conf2, producer2);
     ASSERT_EQ(ResultOk, result);
 
-    int size = Commands::DefaultMaxMessageSize + 1000 * 100;
+    int size = ClientConnection::getMaxMessageSize() + 1000 * 100;
     char *content = new char[size];
     memset(content, 0, size);
     Message msg = MessageBuilder().setAllocatedContent(content, size).build();
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 199b50c..57ed0ec 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -59,6 +59,8 @@
     ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), false);
     ASSERT_EQ(conf.getProperties().empty(), true);
     ASSERT_EQ(conf.getPriorityLevel(), 0);
+    ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 100);
+    ASSERT_EQ(conf.isAutoOldestChunkedMessageOnQueueFull(), false);
 }
 
 TEST(ConsumerConfigurationTest, testCustomConfig) {
@@ -139,6 +141,12 @@
 
     conf.setPriorityLevel(1);
     ASSERT_EQ(conf.getPriorityLevel(), 1);
+
+    conf.setMaxPendingChunkedMessage(500);
+    ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 500);
+
+    conf.setAutoOldestChunkedMessageOnQueueFull(true);
+    ASSERT_TRUE(conf.isAutoOldestChunkedMessageOnQueueFull());
 }
 
 TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {
diff --git a/pulsar-client-cpp/tests/MapCacheTest.cc b/pulsar-client-cpp/tests/MapCacheTest.cc
new file mode 100644
index 0000000..12a89ee
--- /dev/null
+++ b/pulsar-client-cpp/tests/MapCacheTest.cc
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <lib/MapCache.h>
+
+using namespace pulsar;
+
+struct MoveOnlyInt {
+    int x = 0;
+
+    MoveOnlyInt() = default;
+    MoveOnlyInt(int xx) : x(xx) {}
+    MoveOnlyInt(const MoveOnlyInt&) = delete;
+    MoveOnlyInt(MoveOnlyInt&& rhs) noexcept : x(rhs.x) {}
+
+    bool operator=(const MoveOnlyInt& rhs) const { return x == rhs.x; }
+};
+
+TEST(MapCacheTest, testPutIfAbsent) {
+    MapCache<int, MoveOnlyInt> cache;
+
+    ASSERT_NE(cache.putIfAbsent(1, {100}), cache.end());
+    ASSERT_EQ(cache.putIfAbsent(1, {200}), cache.end());
+    auto it = cache.find(1);
+    ASSERT_NE(it, cache.end());
+    ASSERT_EQ(it->second.x, 100);
+
+    cache.remove(1);
+    ASSERT_EQ(cache.find(1), cache.end());
+}
+
+TEST(MapCacheTest, testRemoveOldestValues) {
+    MapCache<int, MoveOnlyInt> cache;
+    cache.putIfAbsent(1, {200});
+    cache.putIfAbsent(2, {210});
+    cache.putIfAbsent(3, {220});
+    ASSERT_EQ(cache.getKeys(), (std::vector<int>{1, 2, 3}));
+
+    std::vector<int> removedValues;
+    cache.removeOldestValues(2, [&removedValues](const int& key, const MoveOnlyInt& value) {
+        removedValues.emplace_back(value.x);
+    });
+    ASSERT_EQ(removedValues, (std::vector<int>{200, 210}));
+
+    ASSERT_EQ(cache.getKeys(), (std::vector<int>{3}));
+    ASSERT_EQ(cache.size(), 1);
+    auto it = cache.find(3);
+    ASSERT_NE(it, cache.end());
+    ASSERT_EQ(it->second.x, 220);
+}
+
+TEST(MapCacheTest, testRemoveAllValues) {
+    MapCache<int, MoveOnlyInt> cache;
+    cache.putIfAbsent(1, {300});
+    cache.putIfAbsent(2, {310});
+    cache.putIfAbsent(3, {320});
+
+    // removeOldestValues works well even if the argument is greater than the size of keys
+    cache.removeOldestValues(10000, nullptr);
+    ASSERT_TRUE(cache.getKeys().empty());
+    ASSERT_EQ(cache.size(), 0);
+}
diff --git a/pulsar-client-cpp/tests/MessageChunkingTest.cc b/pulsar-client-cpp/tests/MessageChunkingTest.cc
new file mode 100644
index 0000000..ae0114c
--- /dev/null
+++ b/pulsar-client-cpp/tests/MessageChunkingTest.cc
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <ctime>
+#include <random>
+
+#include <pulsar/Client.h>
+#include <gtest/gtest.h>
+#include "lib/LogUtils.h"
+#include "PulsarFriend.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+// See the `maxMessageSize` config in test-conf/standalone-ssl.conf
+static constexpr size_t maxMessageSize = 1024000;
+
+static std::string toString(CompressionType compressionType) {
+    switch (compressionType) {
+        case CompressionType::CompressionNone:
+            return "None";
+        case CompressionType::CompressionLZ4:
+            return "LZ4";
+        case CompressionType::CompressionZLib:
+            return "ZLib";
+        case CompressionType::CompressionZSTD:
+            return "ZSTD";
+        case CompressionType::CompressionSNAPPY:
+            return "SNAPPY";
+        default:
+            return "Unknown (" + std::to_string(compressionType) + ")";
+    }
+}
+
+inline std::string createLargeMessage() {
+    std::string largeMessage(maxMessageSize * 3, 'a');
+    std::default_random_engine e(time(nullptr));
+    std::uniform_int_distribution<unsigned> u(0, 25);
+    for (size_t i = 0; i < largeMessage.size(); i++) {
+        largeMessage[i] = 'a' + u(e);
+    }
+    return largeMessage;
+}
+
+class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> {
+   public:
+    static std::string largeMessage;
+
+    void TearDown() override { client_.close(); }
+
+    void createProducer(const std::string& topic, Producer& producer) {
+        ProducerConfiguration conf;
+        conf.setBatchingEnabled(false);
+        conf.setChunkingEnabled(true);
+        conf.setCompressionType(GetParam());
+        LOG_INFO("Create producer to topic: " << topic
+                                              << ", compression: " << toString(conf.getCompressionType()));
+        ASSERT_EQ(ResultOk, client_.createProducer(topic, conf, producer));
+    }
+
+    void createConsumer(const std::string& topic, Consumer& consumer) {
+        ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
+    }
+
+   private:
+    Client client_{lookupUrl};
+};
+
+std::string MessageChunkingTest::largeMessage = createLargeMessage();
+
+TEST_F(MessageChunkingTest, testInvalidConfig) {
+    Client client(lookupUrl);
+    ProducerConfiguration conf;
+    conf.setBatchingEnabled(true);
+    conf.setChunkingEnabled(true);
+    Producer producer;
+    ASSERT_THROW(client.createProducer("xxx", conf, producer), std::invalid_argument);
+    client.close();
+}
+
+TEST_P(MessageChunkingTest, testEndToEnd) {
+    const std::string topic =
+        "MessageChunkingTest-EndToEnd-" + toString(GetParam()) + std::to_string(time(nullptr));
+    Consumer consumer;
+    createConsumer(topic, consumer);
+    Producer producer;
+    createProducer(topic, producer);
+
+    constexpr int numMessages = 10;
+
+    std::vector<MessageId> sendMessageIds;
+    for (int i = 0; i < numMessages; i++) {
+        MessageId messageId;
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
+        LOG_INFO("Send " << i << " to " << messageId);
+        sendMessageIds.emplace_back(messageId);
+    }
+
+    Message msg;
+    std::vector<MessageId> receivedMessageIds;
+    for (int i = 0; i < numMessages; i++) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+        LOG_INFO("Receive " << msg.getLength() << " bytes from " << msg.getMessageId());
+        ASSERT_EQ(msg.getDataAsString(), largeMessage);
+        receivedMessageIds.emplace_back(msg.getMessageId());
+    }
+    ASSERT_EQ(receivedMessageIds, sendMessageIds);
+    ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId());
+    ASSERT_GT(receivedMessageIds.back().entryId(), numMessages);
+
+    // Verify the cache has been cleared
+    auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
+    ASSERT_EQ(chunkedMessageCache.size(), 0);
+}
+
+// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P
+INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest,
+                        ::testing::Values(CompressionNone, CompressionLZ4, CompressionZLib, CompressionZSTD,
+                                          CompressionSNAPPY));
diff --git a/pulsar-client-cpp/tests/ProducerConfigurationTest.cc b/pulsar-client-cpp/tests/ProducerConfigurationTest.cc
index b88f6e4..5c54129 100644
--- a/pulsar-client-cpp/tests/ProducerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ProducerConfigurationTest.cc
@@ -46,6 +46,7 @@
     ASSERT_EQ(conf.isEncryptionEnabled(), false);
     ASSERT_EQ(conf.getEncryptionKeys(), std::set<std::string>{});
     ASSERT_EQ(conf.getProperties().empty(), true);
+    ASSERT_EQ(conf.isChunkingEnabled(), false);
 }
 
 class MockMessageRoutingPolicy : public MessageRoutingPolicy {
@@ -129,4 +130,7 @@
     conf.setProperty("k1", "v1");
     ASSERT_EQ(conf.getProperties()["k1"], "v1");
     ASSERT_EQ(conf.hasProperty("k1"), true);
+
+    conf.setChunkingEnabled(true);
+    ASSERT_EQ(conf.isChunkingEnabled(), true);
 }
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 210f013..3ee8def 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -26,6 +26,7 @@
 #include "lib/Utils.h"
 #include "lib/Latch.h"
 #include "lib/LogUtils.h"
+#include "lib/ProducerImpl.h"
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
@@ -240,4 +241,12 @@
         client.close();
         LOG_INFO("End of run " << run);
     }
-}
\ No newline at end of file
+}
+
+TEST(ProducerTest, testGetNumOfChunks) {
+    ASSERT_EQ(ProducerImpl::getNumOfChunks(11, 5), 3);
+    ASSERT_EQ(ProducerImpl::getNumOfChunks(10, 5), 2);
+    ASSERT_EQ(ProducerImpl::getNumOfChunks(8, 5), 2);
+    ASSERT_EQ(ProducerImpl::getNumOfChunks(4, 5), 1);
+    ASSERT_EQ(ProducerImpl::getNumOfChunks(1, 0), 1);
+}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index aed7096..c7aa3a6 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -79,6 +79,12 @@
         return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
     }
 
+    static decltype(ConsumerImpl::chunkedMessageCache_) & getChunkedMessageCache(Consumer consumer) {
+        auto consumerImpl = getConsumerImplPtr(consumer);
+        ConsumerImpl::Lock lock(consumerImpl->chunkProcessMutex_);
+        return consumerImpl->chunkedMessageCache_;
+    }
+
     static std::shared_ptr<PartitionedConsumerImpl> getPartitionedConsumerImplPtr(Consumer consumer) {
         return std::static_pointer_cast<PartitionedConsumerImpl>(consumer.impl_);
     }