Fifo opt (#732)

* Prepare to optimize FIFO publishing

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>

* fix: SendReceipt now contains std::unique_ptr<Message> being sent

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>

* fix: add doc explaining why we taking ownership of the message being sent

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>

* feat: implement FifoProducerPartition

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>

* feat: implement FifoProducerImpl

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>

* feat: implement builder for FifoProducer

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>

* fix: prepare to debug

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>

* fix: log sending sending stages

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>

---------

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt
index 8d6b039..2730447 100644
--- a/cpp/examples/CMakeLists.txt
+++ b/cpp/examples/CMakeLists.txt
@@ -4,6 +4,7 @@
 endfunction()
 
 add_example(example_producer ExampleProducer.cpp)
+add_example(example_fifo_producer ExampleFifoProducer.cpp)
 add_example(example_producer_with_async ExampleProducerWithAsync.cpp)
 add_example(example_producer_with_fifo_message ExampleProducerWithFifoMessage.cpp)
 add_example(example_producer_with_timed_message ExampleProducerWithTimedMessage.cpp)
diff --git a/cpp/examples/ExampleFifoProducer.cpp b/cpp/examples/ExampleFifoProducer.cpp
new file mode 100644
index 0000000..9d99be3
--- /dev/null
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <algorithm>
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <memory>
+#include <random>
+#include <string>
+#include <system_error>
+
+#include "gflags/gflags.h"
+#include "rocketmq/CredentialsProvider.h"
+#include "rocketmq/FifoProducer.h"
+#include "rocketmq/Logger.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/Producer.h"
+#include "rocketmq/SendReceipt.h"
+
+using namespace ROCKETMQ_NAMESPACE;
+
+/**
+ * @brief A simple Semaphore to limit request concurrency.
+ */
+class Semaphore {
+public:
+  Semaphore(std::size_t permits) : permits_(permits) {
+  }
+
+  /**
+   * @brief Acquire a permit.
+   */
+  void acquire() {
+    while (true) {
+      std::unique_lock<std::mutex> lk(mtx_);
+      if (permits_ > 0) {
+        permits_--;
+        return;
+      }
+      cv_.wait(lk, [this]() { return permits_ > 0; });
+    }
+  }
+
+  /**
+   * @brief Release the permit back to semaphore.
+   */
+  void release() {
+    std::unique_lock<std::mutex> lk(mtx_);
+    permits_++;
+    if (1 == permits_) {
+      cv_.notify_one();
+    }
+  }
+
+private:
+  std::size_t permits_{0};
+  std::mutex mtx_;
+  std::condition_variable cv_;
+};
+
+const std::string& alphaNumeric() {
+  static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+  return alpha_numeric;
+}
+
+std::string randomString(std::string::size_type len) {
+  std::string result;
+  result.reserve(len);
+  std::random_device rd;
+  std::mt19937 generator(rd());
+  std::string source(alphaNumeric());
+  std::string::size_type generated = 0;
+  while (generated < len) {
+    std::shuffle(source.begin(), source.end(), generator);
+    std::string::size_type delta = std::min({len - generated, source.length()});
+    result.append(source.substr(0, delta));
+    generated += delta;
+  }
+  return result;
+}
+
+DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
+DEFINE_uint32(concurrency, 16, "Concurrency of FIFO producer");
+
+int main(int argc, char* argv[]) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  auto& logger = getLogger();
+  logger.setConsoleLevel(Level::Debug);
+  logger.setLevel(Level::Debug);
+  logger.init();
+
+  // Access Key/Secret pair may be acquired from management console
+  CredentialsProviderPtr credentials_provider;
+  if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+    credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret);
+  }
+
+  // In most case, you don't need to create too many producers, singleton pattern is recommended.
+  auto producer = FifoProducer::newBuilder()
+                      .withConfiguration(Configuration::newBuilder()
+                                             .withEndpoints(FLAGS_access_point)
+                                             .withCredentialsProvider(credentials_provider)
+                                             .withSsl(FLAGS_tls)
+                                             .build())
+                      .withConcurrency(FLAGS_concurrency)
+                      .withTopics({FLAGS_topic})
+                      .build();
+
+  std::atomic_bool stopped;
+  std::atomic_long count(0);
+
+  auto stats_lambda = [&] {
+    while (!stopped.load(std::memory_order_relaxed)) {
+      long cnt = count.load(std::memory_order_relaxed);
+      while (!count.compare_exchange_weak(cnt, 0)) {
+        cnt = count.load(std::memory_order_relaxed);
+      }
+      std::this_thread::sleep_for(std::chrono::seconds(1));
+      std::cout << "QPS: " << cnt << std::endl;
+    }
+  };
+
+  std::thread stats_thread(stats_lambda);
+
+  std::string body = randomString(FLAGS_message_body_size);
+
+  std::size_t completed = 0;
+  std::mutex mtx;
+  std::condition_variable cv;
+
+  std::unique_ptr<Semaphore> semaphore(new Semaphore(FLAGS_concurrency));
+
+  try {
+    for (std::size_t i = 0; i < FLAGS_total; ++i) {
+      auto message = Message::newBuilder()
+                         .withTopic(FLAGS_topic)
+                         .withTag("TagA")
+                         .withKeys({"Key-" + std::to_string(i)})
+                         .withGroup("message-group" + std::to_string(i % FLAGS_concurrency))
+                         .withBody(body)
+                         .build();
+      std::error_code ec;
+      auto callback = [&](const std::error_code& ec, const SendReceipt& receipt) mutable {
+        completed++;
+        count++;
+        semaphore->release();
+
+        if (completed >= FLAGS_total) {
+          cv.notify_all();
+        }
+      };
+
+      semaphore->acquire();
+      producer.send(std::move(message), callback);
+      std::cout << "Cached No." << i << " message" << std::endl;
+    }
+  } catch (...) {
+    std::cerr << "Ah...No!!!" << std::endl;
+  }
+
+  {
+    std::unique_lock<std::mutex> lk(mtx);
+    cv.wait(lk, [&]() { return completed >= FLAGS_total; });
+    std::cout << "Completed: " << completed << ", total: " << FLAGS_total << std::endl;
+  }
+
+  stopped.store(true, std::memory_order_relaxed);
+  if (stats_thread.joinable()) {
+    stats_thread.join();
+  }
+
+  return EXIT_SUCCESS;
+}
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp b/cpp/examples/ExampleProducerWithAsync.cpp
index 63b7611..d88dfc8 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -17,7 +17,6 @@
 #include <algorithm>
 #include <atomic>
 #include <condition_variable>
-#include <cstdint>
 #include <iostream>
 #include <mutex>
 #include <random>
diff --git a/cpp/examples/ExamplePushConsumer.cpp b/cpp/examples/ExamplePushConsumer.cpp
index ab106cb..66a85f4 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -16,7 +16,6 @@
  */
 #include <chrono>
 #include <iostream>
-#include <mutex>
 #include <thread>
 
 #include "gflags/gflags.h"
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp b/cpp/examples/ExampleSimpleConsumer.cpp
index 17a84b7..aedec71 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -16,7 +16,6 @@
  */
 #include <chrono>
 #include <iostream>
-#include <thread>
 
 #include "gflags/gflags.h"
 #include "rocketmq/Logger.h"
diff --git a/cpp/include/rocketmq/Configuration.h b/cpp/include/rocketmq/Configuration.h
index 0037c27..6dcd413 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -44,7 +44,7 @@
   }
 
   bool withSsl() const {
-    return withSsl_;
+    return tls_;
   }
 
 protected:
@@ -56,7 +56,7 @@
   std::string               endpoints_;
   CredentialsProviderPtr    credentials_provider_;
   std::chrono::milliseconds request_timeout_{ConfigurationDefaults::RequestTimeout};
-  bool withSsl_ = true;
+  bool tls_ = true;
 };
 
 class ConfigurationBuilder {
@@ -67,7 +67,7 @@
 
   ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds request_timeout);
 
-  ConfigurationBuilder& withSsl(bool enable);
+  ConfigurationBuilder& withSsl(bool with_ssl);
 
   Configuration build();
 
diff --git a/cpp/include/rocketmq/FifoProducer.h b/cpp/include/rocketmq/FifoProducer.h
new file mode 100644
index 0000000..fbf7662
--- /dev/null
+++ b/cpp/include/rocketmq/FifoProducer.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "Configuration.h"
+#include "Message.h"
+#include "RocketMQ.h"
+#include "SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl;
+class FifoProducerBuilder;
+class ProducerImpl;
+
+class FifoProducer {
+public:
+  static FifoProducerBuilder newBuilder();
+
+  void send(MessageConstPtr message, SendCallback callback);
+
+private:
+  std::shared_ptr<FifoProducerImpl> impl_;
+
+  explicit FifoProducer(std::shared_ptr<FifoProducerImpl> impl) : impl_(std::move(impl)) {
+  }
+
+  void start();
+
+  friend class FifoProducerBuilder;
+};
+
+class FifoProducerBuilder {
+public:
+  FifoProducerBuilder();
+
+  FifoProducerBuilder& withConfiguration(Configuration configuration);
+
+  FifoProducerBuilder& withTopics(const std::vector<std::string>& topics);
+
+  FifoProducerBuilder& withConcurrency(std::size_t concurrency);
+
+  FifoProducer build();
+
+private:
+  std::shared_ptr<FifoProducerImpl> impl_;
+  std::shared_ptr<ProducerImpl> producer_impl_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index 42004eb..6b42843 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -16,20 +16,17 @@
  */
 #pragma once
 
-#include <chrono>
-#include <functional>
 #include <memory>
 #include <system_error>
 #include <vector>
 
 #include "Configuration.h"
-#include "ErrorCode.h"
-#include "Logger.h"
 #include "Message.h"
 #include "SendCallback.h"
 #include "SendReceipt.h"
 #include "Transaction.h"
 #include "TransactionChecker.h"
+#include "rocketmq/Logger.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/include/rocketmq/SendReceipt.h
index 489df5e..7eef6e7 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -16,20 +16,21 @@
  */
 #pragma once
 
-#include <cstdint>
 #include <string>
-#include <utility>
 
 #include "RocketMQ.h"
+#include "rocketmq/Message.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
 struct SendReceipt {
+  std::string target;
+
   std::string message_id;
 
   std::string transaction_id;
 
-  std::string target;
+  MessageConstPtr message;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/base/Configuration.cpp b/cpp/source/base/Configuration.cpp
index 2a136d5..66cff2e 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -38,8 +38,8 @@
   return *this;
 }
 
-ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) {
-  configuration_.withSsl_ = enable;
+ConfigurationBuilder& ConfigurationBuilder::withSsl(bool with_ssl) {
+  configuration_.tls_ = with_ssl;
   return *this;
 }
 
diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp
index 643d374..7d724c7 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -27,32 +27,27 @@
 #include "InvocationContext.h"
 #include "LogInterceptor.h"
 #include "LogInterceptorFactory.h"
-#include "MessageExt.h"
-#include "MetadataConstants.h"
 #include "MixAll.h"
 #include "Protocol.h"
 #include "ReceiveMessageContext.h"
 #include "RpcClient.h"
 #include "RpcClientImpl.h"
 #include "Scheduler.h"
-#include "TlsHelper.h"
+#include "SchedulerImpl.h"
 #include "UtilAll.h"
-#include "apache/rocketmq/v2/definition.pb.h"
 #include "google/protobuf/util/time_util.h"
 #include "grpcpp/create_channel.h"
 #include "rocketmq/ErrorCode.h"
-#include "rocketmq/Logger.h"
-#include "rocketmq/SendReceipt.h"
 #include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool withSsl)
+ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_ssl)
     : scheduler_(std::make_shared<SchedulerImpl>()),
       resource_namespace_(std::move(resource_namespace)),
       state_(State::CREATED),
       callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
-      withSsl_(withSsl) {
+      with_ssl_(with_ssl) {
   certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
   tls_channel_credential_options_.set_verify_server_certs(false);
   tls_channel_credential_options_.set_check_call_host(false);
@@ -285,7 +280,7 @@
 bool ClientManagerImpl::send(const std::string& target_host,
                              const Metadata& metadata,
                              SendMessageRequest& request,
-                             SendCallback cb) {
+                             SendResultCallback cb) {
   assert(cb);
   SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.DebugString());
   RpcClientSharedPtr client = getRpcClient(target_host);
@@ -311,15 +306,14 @@
       return;
     }
 
-    SendReceipt send_receipt = {};
-    send_receipt.target = target_host;
-    std::error_code ec;
+    SendResult send_result = {};
+    send_result.target = target_host;
     if (!invocation_context->status.ok()) {
       SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code: {}, gRPC error message: {}",
                   invocation_context->remote_address, invocation_context->status.error_code(),
                   invocation_context->status.error_message());
-      ec = ErrorCode::RequestTimeout;
-      cb(ec, send_receipt);
+      send_result.ec = ErrorCode::RequestTimeout;
+      cb(send_result);
       return;
     }
 
@@ -328,8 +322,8 @@
       case rmq::Code::OK: {
         if (!invocation_context->response.entries().empty()) {
           auto first = invocation_context->response.entries().begin();
-          send_receipt.message_id = first->message_id();
-          send_receipt.transaction_id = first->transaction_id();
+          send_result.message_id = first->message_id();
+          send_result.transaction_id = first->transaction_id();
         } else {
           SPDLOG_ERROR("Unexpected send-message-response: {}", invocation_context->response.DebugString());
         }
@@ -338,127 +332,127 @@
 
       case rmq::Code::ILLEGAL_TOPIC: {
         SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::IllegalTopic;
+        send_result.ec = ErrorCode::IllegalTopic;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_TAG: {
         SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageTag;
+        send_result.ec = ErrorCode::IllegalMessageTag;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_KEY: {
         SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageKey;
+        send_result.ec = ErrorCode::IllegalMessageKey;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_GROUP: {
         SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageGroup;
+        send_result.ec = ErrorCode::IllegalMessageGroup;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: {
         SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageProperty;
+        send_result.ec = ErrorCode::IllegalMessageProperty;
         break;
       }
 
       case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
         SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::MessagePropertiesTooLarge;
+        send_result.ec = ErrorCode::MessagePropertiesTooLarge;
         break;
       }
 
       case rmq::Code::MESSAGE_BODY_TOO_LARGE: {
         SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::MessageBodyTooLarge;
+        send_result.ec = ErrorCode::MessageBodyTooLarge;
         break;
       }
 
       case rmq::Code::TOPIC_NOT_FOUND: {
         SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::TopicNotFound;
+        send_result.ec = ErrorCode::TopicNotFound;
         break;
       }
 
       case rmq::Code::NOT_FOUND: {
         SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::NotFound;
+        send_result.ec = ErrorCode::NotFound;
         break;
       }
 
       case rmq::Code::UNAUTHORIZED: {
         SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::Unauthorized;
+        send_result.ec = ErrorCode::Unauthorized;
         break;
       }
 
       case rmq::Code::FORBIDDEN: {
         SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::Forbidden;
+        send_result.ec = ErrorCode::Forbidden;
         break;
       }
 
       case rmq::Code::MESSAGE_CORRUPTED: {
         SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::MessageCorrupted;
+        send_result.ec = ErrorCode::MessageCorrupted;
         break;
       }
 
       case rmq::Code::TOO_MANY_REQUESTS: {
         SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::TooManyRequests;
+        send_result.ec = ErrorCode::TooManyRequests;
         break;
       }
 
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::InternalServerError;
+        send_result.ec = ErrorCode::InternalServerError;
         break;
       }
 
       case rmq::Code::HA_NOT_AVAILABLE: {
         SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::InternalServerError;
+        send_result.ec = ErrorCode::InternalServerError;
         break;
       }
 
       case rmq::Code::PROXY_TIMEOUT: {
         SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::GatewayTimeout;
+        send_result.ec = ErrorCode::GatewayTimeout;
         break;
       }
 
       case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: {
         SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::GatewayTimeout;
+        send_result.ec = ErrorCode::GatewayTimeout;
         break;
       }
 
       case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: {
         SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::GatewayTimeout;
+        send_result.ec = ErrorCode::GatewayTimeout;
         break;
       }
 
       case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
         SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address,
                     invocation_context->response.DebugString());
-        ec = ErrorCode::MessagePropertyConflictWithType;
+        send_result.ec = ErrorCode::MessagePropertyConflictWithType;
         break;
       }
 
       default: {
         SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address);
-        ec = ErrorCode::NotSupported;
+        send_result.ec = ErrorCode::NotSupported;
         break;
       }
     }
 
-    cb(ec, send_receipt);
+    cb(send_result);
   };
 
   invocation_context->callback = completion_callback;
@@ -476,7 +470,7 @@
   std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
   interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
   auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
-      target_host, withSsl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_,
+      target_host, with_ssl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_,
       std::move(interceptor_factories));
   return channel;
 }
@@ -520,28 +514,28 @@
   rpc_clients_.clear();
 }
 
-SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue,
-                                                   const SendMessageResponse& response,
-                                                   std::error_code& ec) {
-  SendReceipt send_receipt;
+SendResult ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue,
+                                                  const SendMessageResponse& response,
+                                                  std::error_code& ec) {
+  SendResult send_result;
 
   switch (response.status().code()) {
     case rmq::Code::OK: {
       assert(response.entries_size() > 0);
-      send_receipt.message_id = response.entries().begin()->message_id();
-      send_receipt.transaction_id = response.entries().begin()->transaction_id();
-      return send_receipt;
+      send_result.message_id = response.entries().begin()->message_id();
+      send_result.transaction_id = response.entries().begin()->transaction_id();
+      return send_result;
     }
     case rmq::Code::ILLEGAL_TOPIC: {
       ec = ErrorCode::BadRequest;
-      return send_receipt;
+      return send_result;
     }
     default: {
       // TODO: handle other cases.
       break;
     }
   }
-  return send_receipt;
+  return send_result;
 }
 
 void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) {
diff --git a/cpp/source/client/include/ClientManager.h b/cpp/source/client/include/ClientManager.h
index 56325fa..02b232b 100644
--- a/cpp/source/client/include/ClientManager.h
+++ b/cpp/source/client/include/ClientManager.h
@@ -22,14 +22,12 @@
 #include <system_error>
 
 #include "Client.h"
-#include "MessageExt.h"
 #include "Metadata.h"
 #include "ReceiveMessageCallback.h"
 #include "RpcClient.h"
 #include "Scheduler.h"
-#include "TelemetryBidiReactor.h"
+#include "SendResultCallback.h"
 #include "TopicRouteData.h"
-#include "rocketmq/SendCallback.h"
 #include "rocketmq/State.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
@@ -93,8 +91,10 @@
   virtual void receiveMessage(const std::string& target, const Metadata& metadata, const ReceiveMessageRequest& request,
                               std::chrono::milliseconds timeout, ReceiveMessageCallback callback) = 0;
 
-  virtual bool send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request,
-                    SendCallback cb) = 0;
+  virtual bool send(const std::string& target_host,
+                    const Metadata& metadata,
+                    SendMessageRequest& request,
+                    SendResultCallback cb) = 0;
 
   virtual std::error_code notifyClientTermination(const std::string& target_host, const Metadata& metadata,
                                                   const NotifyClientTerminationRequest& request,
diff --git a/cpp/source/client/include/ClientManagerImpl.h b/cpp/source/client/include/ClientManagerImpl.h
index 653fcad..5f1b27c 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -20,7 +20,6 @@
 #include <chrono>
 #include <cstdint>
 #include <functional>
-#include <future>
 #include <memory>
 #include <string>
 #include <system_error>
@@ -29,18 +28,13 @@
 #include "Client.h"
 #include "ClientManager.h"
 #include "InsecureCertificateVerifier.h"
-#include "InvocationContext.h"
 #include "ReceiveMessageCallback.h"
 #include "RpcClientImpl.h"
-#include "SchedulerImpl.h"
-#include "SendMessageContext.h"
-#include "TelemetryBidiReactor.h"
 #include "ThreadPoolImpl.h"
 #include "TopicRouteData.h"
 #include "absl/base/thread_annotations.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
 #include "absl/synchronization/mutex.h"
 #include "rocketmq/State.h"
 
@@ -54,7 +48,7 @@
    * effectively.
    * @param resource_namespace Abstract resource namespace, in which this client manager lives.
    */
-  explicit ClientManagerImpl(std::string resource_namespace, bool withSsl = true);
+  explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = true);
 
   ~ClientManagerImpl() override;
 
@@ -89,7 +83,7 @@
   bool send(const std::string& target_host,
             const Metadata& metadata,
             SendMessageRequest& request,
-            SendCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
+            SendResultCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
 
   /**
    * Get a RpcClient according to the given target hosts, which follows scheme specified
@@ -105,7 +99,7 @@
   RpcClientSharedPtr getRpcClient(const std::string& target_host, bool need_heartbeat = true) override
       LOCKS_EXCLUDED(rpc_clients_mtx_);
 
-  static SendReceipt processSendResponse(const rmq::MessageQueue& message_queue,
+  static SendResult processSendResponse(const rmq::MessageQueue& message_queue,
                                          const SendMessageResponse& response,
                                          std::error_code& ec);
 
@@ -242,7 +236,7 @@
   grpc::ChannelArguments channel_arguments_;
 
   bool trace_{false};
-  bool withSsl_;
+  bool with_ssl_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/SendResult.h b/cpp/source/client/include/SendResult.h
new file mode 100644
index 0000000..d5f8527
--- /dev/null
+++ b/cpp/source/client/include/SendResult.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <system_error>
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+struct SendResult {
+  std::error_code ec;
+  std::string target;
+
+  std::string message_id;
+  std::string transaction_id;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/SendResultCallback.h b/cpp/source/client/include/SendResultCallback.h
new file mode 100644
index 0000000..3dd5bda
--- /dev/null
+++ b/cpp/source/client/include/SendResultCallback.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <functional>
+
+#include "SendResult.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+using SendResultCallback = std::function<void(const SendResult&)>;
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoContext.cpp b/cpp/source/rocketmq/FifoContext.cpp
new file mode 100644
index 0000000..f1affd1
--- /dev/null
+++ b/cpp/source/rocketmq/FifoContext.cpp
@@ -0,0 +1,16 @@
+#include "FifoContext.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoContext::FifoContext(MessageConstPtr message, SendCallback callback)
+    : message(std::move(message)), callback(callback) {
+}
+
+FifoContext::FifoContext(FifoContext&& rhs) noexcept {
+  this->message = std::move(rhs.message);
+  this->callback = rhs.callback;
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducer.cpp b/cpp/source/rocketmq/FifoProducer.cpp
new file mode 100644
index 0000000..da43b58
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducer.cpp
@@ -0,0 +1,56 @@
+#include "rocketmq/FifoProducer.h"
+
+#include <cstddef>
+#include <memory>
+
+#include "FifoProducerImpl.h"
+#include "ProducerImpl.h"
+#include "StaticNameServerResolver.h"
+#include "rocketmq/Configuration.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoProducerBuilder FifoProducer::newBuilder() {
+  return {};
+}
+
+FifoProducerBuilder::FifoProducerBuilder() : producer_impl_(std::make_shared<ProducerImpl>()) {
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConfiguration(Configuration configuration) {
+  auto name_server_resolver = std::make_shared<StaticNameServerResolver>(configuration.endpoints());
+  producer_impl_->withNameServerResolver(std::move(name_server_resolver));
+  producer_impl_->withCredentialsProvider(configuration.credentialsProvider());
+  producer_impl_->withRequestTimeout(configuration.requestTimeout());
+  producer_impl_->withSsl(configuration.withSsl());
+  return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withTopics(const std::vector<std::string>& topics) {
+  producer_impl_->withTopics(topics);
+  return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConcurrency(std::size_t concurrency) {
+  this->impl_ = std::make_shared<FifoProducerImpl>(producer_impl_, concurrency);
+  return *this;
+}
+
+FifoProducer FifoProducerBuilder::build() {
+  FifoProducer fifo_producer(this->impl_);
+  fifo_producer.start();
+  return fifo_producer;
+}
+
+void FifoProducer::start() {
+  impl_->internalProducer()->start();
+}
+
+void FifoProducer::send(MessageConstPtr message, SendCallback callback) {
+  impl_->send(std::move(message), callback);
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/FifoProducerImpl.cpp b/cpp/source/rocketmq/FifoProducerImpl.cpp
new file mode 100644
index 0000000..2b33345
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerImpl.cpp
@@ -0,0 +1,21 @@
+#include "FifoProducerImpl.h"
+
+#include <utility>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerImpl::send(MessageConstPtr message, SendCallback callback) {
+  auto& group = message->group();
+  std::size_t hash = hash_fn_(group);
+  std::size_t slot = hash % concurrency_;
+
+  FifoContext context(std::move(message), callback);
+  partitions_[slot]->add(std::move(context));
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp b/cpp/source/rocketmq/FifoProducerPartition.cpp
new file mode 100644
index 0000000..94e1c72
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -0,0 +1,91 @@
+#include "FifoProducerPartition.h"
+
+#include <absl/synchronization/mutex.h>
+
+#include <atomic>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerPartition::add(FifoContext&& context) {
+  {
+    absl::MutexLock lk(&messages_mtx_);
+    messages_.emplace_back(std::move(context));
+    SPDLOG_DEBUG("{} has {} pending messages after #add", name_, messages_.size());
+  }
+
+  trySend();
+}
+
+void FifoProducerPartition::trySend() {
+  bool expected = false;
+  if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
+    absl::MutexLock lk(&messages_mtx_);
+
+    if (messages_.empty()) {
+      SPDLOG_DEBUG("There is no more messages to send");
+      return;
+    }
+
+    FifoContext& ctx = messages_.front();
+    MessageConstPtr message = std::move(ctx.message);
+    SendCallback send_callback = ctx.callback;
+
+    std::shared_ptr<FifoProducerPartition> partition = shared_from_this();
+    auto fifo_callback = [=](const std::error_code& ec, const SendReceipt& receipt) mutable {
+      partition->onComplete(ec, receipt, send_callback);
+    };
+    SPDLOG_DEBUG("Sending FIFO message from {}", name_);
+    producer_->send(std::move(message), fifo_callback);
+    messages_.pop_front();
+    SPDLOG_DEBUG("In addition to the inflight one, there is {} messages pending in {}", messages_.size(), name_);
+  } else {
+    SPDLOG_DEBUG("There is an inflight message");
+  }
+}
+
+void FifoProducerPartition::onComplete(const std::error_code& ec, const SendReceipt& receipt, SendCallback& callback) {
+  if (ec) {
+    SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message());
+  } else {
+    SPDLOG_DEBUG("{} completed OK", name_);
+  }
+
+  if (!ec) {
+    callback(ec, receipt);
+    // update inflight status
+    bool expected = true;
+    if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
+      trySend();
+    } else {
+      SPDLOG_ERROR("{}: Unexpected inflight status", name_);
+    }
+    return;
+  }
+
+  // Put the message back to the front of the list
+  SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+  FifoContext retry_context(std::move(receipt_mut.message), callback);
+  {
+    absl::MutexLock lk(&messages_mtx_);
+    messages_.emplace_front(std::move(retry_context));
+  }
+
+  // Update inflight status
+  bool expected = true;
+  if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
+    trySend();
+  } else {
+    SPDLOG_ERROR("Unexpected inflight status");
+  }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 78d812e..907d0a2 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -21,12 +21,8 @@
 #include <system_error>
 #include <utility>
 
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
-#include "MixAll.h"
 #include "ProducerImpl.h"
 #include "StaticNameServerResolver.h"
-#include "absl/strings/str_split.h"
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/SendReceipt.h"
 #include "rocketmq/Transaction.h"
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp
index 7313016..34975e7 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -17,38 +17,27 @@
 #include "ProducerImpl.h"
 
 #include <algorithm>
-#include <apache/rocketmq/v2/definition.pb.h>
-
 #include <atomic>
 #include <cassert>
 #include <chrono>
-#include <limits>
 #include <memory>
 #include <system_error>
 #include <utility>
 
-#include "Client.h"
-#include "MessageGroupQueueSelector.h"
-#include "MetadataConstants.h"
+#include "apache/rocketmq/v2/definition.pb.h"
 #include "MixAll.h"
 #include "Protocol.h"
 #include "PublishInfoCallback.h"
-#include "RpcClient.h"
 #include "SendContext.h"
-#include "SendMessageContext.h"
 #include "Signature.h"
-#include "Tag.h"
 #include "TracingUtility.h"
 #include "TransactionImpl.h"
-#include "UniqueIdGenerator.h"
 #include "UtilAll.h"
-#include "absl/strings/str_join.h"
 #include "opencensus/trace/propagation/trace_context.h"
 #include "opencensus/trace/span.h"
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendReceipt.h"
-#include "rocketmq/Tracing.h"
 #include "rocketmq/Transaction.h"
 #include "rocketmq/TransactionChecker.h"
 
@@ -203,19 +192,28 @@
 SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noexcept {
   ensureRunning(ec);
   if (ec) {
-    return {};
+    SPDLOG_WARN("Producer is not running");
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   auto topic_publish_info = getPublishInfo(message->topic());
   if (!topic_publish_info) {
+    SPDLOG_WARN("Route of topic[{}] is not found", message->topic());
     ec = ErrorCode::NotFound;
-    return {};
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   std::vector<rmq::MessageQueue> message_queue_list;
   if (!topic_publish_info->selectMessageQueues(absl::make_optional<std::string>(), message_queue_list)) {
+    SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]", message->topic());
     ec = ErrorCode::NotFound;
-    return {};
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   auto mtx = std::make_shared<absl::Mutex>();
@@ -224,9 +222,10 @@
   SendReceipt   send_receipt;
 
   // Define callback
-  auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) {
+  auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) mutable {
     ec = code;
-    send_receipt = receipt;
+    SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+    send_receipt.message = std::move(receipt_mut.message);
     {
       absl::MutexLock lk(mtx.get());
       completed = true;
@@ -251,6 +250,7 @@
   ensureRunning(ec);
   if (ec) {
     SendReceipt send_receipt;
+    send_receipt.message = std::move(message);
     cb(ec, send_receipt);
   }
 
@@ -264,6 +264,7 @@
     // No route entries of the given topic is available
     if (ec) {
       SendReceipt send_receipt;
+      send_receipt.message = std::move(ptr);
       cb(ec, send_receipt);
       return;
     }
@@ -271,6 +272,7 @@
     if (!publish_info) {
       std::error_code ec = ErrorCode::NotFound;
       SendReceipt     send_receipt;
+      send_receipt.message = std::move(ptr);
       cb(ec, send_receipt);
       return;
     }
@@ -280,6 +282,7 @@
     if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) {
       std::error_code ec = ErrorCode::NotFound;
       SendReceipt     send_receipt;
+      send_receipt.message = std::move(ptr);
       cb(ec, send_receipt);
       return;
     }
@@ -338,12 +341,12 @@
   Metadata metadata;
   Signature::sign(client_config_, metadata);
 
-  auto callback = [context](const std::error_code& ec, const SendReceipt& send_receipt) {
-    if (ec) {
-      context->onFailure(ec);
+  auto callback = [context](const SendResult& send_result) {
+    if (send_result.ec) {
+      context->onFailure(send_result.ec);
       return;
     }
-    context->onSuccess(send_receipt);
+    context->onSuccess(send_result);
   };
 
   client_manager_->send(target, metadata, request, callback);
@@ -354,12 +357,14 @@
   std::error_code ec;
   validate(*message, ec);
   if (ec) {
+    send_receipt.message = std::move(message);
     callback(ec, send_receipt);
     return;
   }
 
   if (list.empty()) {
     ec = ErrorCode::NotFound;
+    send_receipt.message = std::move(message);
     callback(ec, send_receipt);
     return;
   }
diff --git a/cpp/source/rocketmq/SendContext.cpp b/cpp/source/rocketmq/SendContext.cpp
index 385a1a9..bd97384 100644
--- a/cpp/source/rocketmq/SendContext.cpp
+++ b/cpp/source/rocketmq/SendContext.cpp
@@ -21,41 +21,44 @@
 #include "ProducerImpl.h"
 #include "PublishStats.h"
 #include "Tag.h"
-#include "TransactionImpl.h"
-#include "opencensus/trace/propagation/trace_context.h"
 #include "opencensus/trace/span.h"
-#include "rocketmq/Logger.h"
+#include "rocketmq/ErrorCode.h"
 #include "rocketmq/SendReceipt.h"
 #include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept {
+void SendContext::onSuccess(const SendResult& send_result) noexcept {
   {
     // Mark end of send-message span.
     span_.SetStatus(opencensus::trace::StatusCode::OK);
     span_.End();
   }
 
-  auto publisher = producer_.lock();
-  if (!publisher) {
+  auto producer = producer_.lock();
+  if (!producer) {
+    SPDLOG_WARN("Producer has been destructed");
     return;
   }
 
   // Collect metrics
   {
     auto duration = std::chrono::steady_clock::now() - request_time_;
-    opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}},
+    opencensus::stats::Record({{producer->stats().latency(), MixAll::millisecondsOf(duration)}},
                               {
                                   {Tag::topicTag(), message_->topic()},
-                                  {Tag::clientIdTag(), publisher->config().client_id},
+                                  {Tag::clientIdTag(), producer->config().client_id},
                                   {Tag::invocationStatusTag(), "success"},
                               });
   }
 
   // send_receipt.traceContext(opencensus::trace::propagation::ToTraceParentHeader(span_.context()));
-  std::error_code ec;
-  callback_(ec, send_receipt);
+  SendReceipt send_receipt = {};
+  send_receipt.target = send_result.target;
+  send_receipt.message_id = send_result.message_id;
+  send_receipt.transaction_id = send_result.transaction_id;
+  send_receipt.message = std::move(message_);
+  callback_(send_result.ec, send_receipt);
 }
 
 void SendContext::onFailure(const std::error_code& ec) noexcept {
@@ -65,38 +68,36 @@
     span_.End();
   }
 
-  auto publisher = producer_.lock();
-  if (!publisher) {
+  auto producer = producer_.lock();
+  if (!producer) {
+    SPDLOG_WARN("Producer has been destructed");
     return;
   }
 
   // Collect metrics
   {
     auto duration = std::chrono::steady_clock::now() - request_time_;
-    opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}},
+    opencensus::stats::Record({{producer->stats().latency(), MixAll::millisecondsOf(duration)}},
                               {
                                   {Tag::topicTag(), message_->topic()},
-                                  {Tag::clientIdTag(), publisher->config().client_id},
+                                  {Tag::clientIdTag(), producer->config().client_id},
                                   {Tag::invocationStatusTag(), "failure"},
                               });
   }
 
-  if (++attempt_times_ >= publisher->maxAttemptTimes()) {
-    SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", attempt_times_, publisher->maxAttemptTimes());
-    callback_(ec, {});
-    return;
-  }
-
-  std::shared_ptr<ProducerImpl> producer = producer_.lock();
-  if (!producer) {
-    SPDLOG_WARN("Producer has been destructed");
-    callback_(ec, {});
+  if (++attempt_times_ >= producer->maxAttemptTimes()) {
+    SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", attempt_times_, producer->maxAttemptTimes());
+    SendReceipt receipt{};
+    receipt.message = std::move(message_);
+    callback_(ec, receipt);
     return;
   }
 
   if (candidates_.empty()) {
     SPDLOG_WARN("No alternative hosts to perform additional retries");
-    callback_(ec, {});
+    SendReceipt receipt{};
+    receipt.message = std::move(message_);
+    callback_(ec, receipt);
     return;
   }
 
@@ -106,7 +107,7 @@
   auto ctx = shared_from_this();
   // If publish message requests are throttled, retry after backoff
   if (ErrorCode::TooManyRequests == ec) {
-    auto&& backoff = publisher->backoff(attempt_times_);
+    auto&& backoff = producer->backoff(attempt_times_);
     SPDLOG_DEBUG("Publish message[topic={}, message-id={}] is throttled. Retry after {}ms", message_->topic(),
                  message_->id(), MixAll::millisecondsOf(backoff));
     auto retry_cb = [=]() { producer->sendImpl(ctx); };
diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h
index c266047..d769396 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -94,8 +94,8 @@
     client_config_.request_timeout = absl::FromChrono(request_timeout);
   }
 
-  void withSsl(bool enable) {
-    client_config_.withSsl = enable;
+  void withSsl(bool with_ssl) {
+    client_config_.withSsl = with_ssl;
   }
 
   /**
diff --git a/cpp/source/rocketmq/include/FifoContext.h b/cpp/source/rocketmq/include/FifoContext.h
new file mode 100644
index 0000000..55812ba
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoContext.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+struct FifoContext {
+  MessageConstPtr message;
+  SendCallback callback;
+
+  FifoContext(MessageConstPtr message, SendCallback callback);
+
+  FifoContext(FifoContext&& rhs) noexcept;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h b/cpp/source/rocketmq/include/FifoProducerImpl.h
new file mode 100644
index 0000000..180c3f9
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "FifoProducerPartition.h"
+#include "ProducerImpl.h"
+#include "fmt/format.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl : std::enable_shared_from_this<FifoProducerImpl> {
+public:
+  FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t concurrency)
+      : producer_(producer), concurrency_(concurrency), partitions_(concurrency) {
+    for (auto i = 0; i < concurrency; i++) {
+      partitions_[i] = std::make_shared<FifoProducerPartition>(producer_, fmt::format("slot-{}", i));
+    }
+  };
+
+  void send(MessageConstPtr message, SendCallback callback);
+
+  std::shared_ptr<ProducerImpl>& internalProducer() {
+    return producer_;
+  }
+
+private:
+  std::shared_ptr<ProducerImpl> producer_;
+  std::vector<std::shared_ptr<FifoProducerPartition>> partitions_;
+  std::size_t concurrency_;
+  std::hash<std::string> hash_fn_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h b/cpp/source/rocketmq/include/FifoProducerPartition.h
new file mode 100644
index 0000000..96bb96f
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -0,0 +1,39 @@
+#pragma once
+
+#include <absl/base/internal/thread_annotations.h>
+
+#include <atomic>
+#include <list>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "ProducerImpl.h"
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerPartition : public std::enable_shared_from_this<FifoProducerPartition> {
+public:
+  FifoProducerPartition(std::shared_ptr<ProducerImpl> producer, std::string&& name)
+      : producer_(producer), name_(std::move(name)) {
+  }
+
+  void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
+
+  void trySend() LOCKS_EXCLUDED(messages_mtx_);
+
+  void onComplete(const std::error_code& ec, const SendReceipt& receipt, SendCallback& callback);
+
+private:
+  std::shared_ptr<ProducerImpl> producer_;
+  std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_);
+  absl::Mutex messages_mtx_;
+  std::atomic_bool inflight_{false};
+  std::string name_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h b/cpp/source/rocketmq/include/ProducerImpl.h
index d7260a9..b572f20 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -16,28 +16,23 @@
  */
 #pragma once
 
-#include <chrono>
 #include <memory>
-#include <mutex>
 #include <string>
 #include <system_error>
 
 #include "ClientImpl.h"
-#include "ClientManagerImpl.h"
 #include "MixAll.h"
 #include "PublishInfoCallback.h"
+#include "PublishStats.h"
 #include "SendContext.h"
 #include "TopicPublishInfo.h"
 #include "TransactionImpl.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendCallback.h"
 #include "rocketmq/SendReceipt.h"
-#include "rocketmq/State.h"
 #include "rocketmq/TransactionChecker.h"
-#include "PublishStats.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -53,8 +48,22 @@
 
   void shutdown() override;
 
+  /**
+   * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
+   * sent.
+   *
+   * Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating
+   * application to conduct customized retry policy.
+   */
   SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept;
 
+  /**
+   * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
+   * sent.
+   *
+   * Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating
+   * application to conduct customized retry policy.
+   */
   void send(MessageConstPtr message, SendCallback callback);
 
   void setTransactionChecker(TransactionChecker checker);
@@ -64,6 +73,13 @@
     return absl::make_unique<TransactionImpl>(producer);
   }
 
+  /**
+   * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
+   * sent.
+   *
+   * TODO: Refine this API. Current API is not good enough as it cannot handle the message back to its caller on publish
+   * failure.
+   */
   void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction);
 
   /**
diff --git a/cpp/source/rocketmq/include/SendContext.h b/cpp/source/rocketmq/include/SendContext.h
index 4c05ceb..4067532 100644
--- a/cpp/source/rocketmq/include/SendContext.h
+++ b/cpp/source/rocketmq/include/SendContext.h
@@ -19,16 +19,12 @@
 #include <memory>
 #include <system_error>
 
-#include "absl/container/flat_hash_map.h"
-#include "absl/synchronization/mutex.h"
-#include "opencensus/trace/span.h"
-
 #include "Protocol.h"
+#include "SendResult.h"
 #include "TransactionImpl.h"
-#include "rocketmq/ErrorCode.h"
+#include "opencensus/trace/span.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendCallback.h"
-#include "rocketmq/SendReceipt.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -47,7 +43,7 @@
         span_(opencensus::trace::Span::BlankSpan()) {
   }
 
-  void onSuccess(const SendReceipt& send_receipt) noexcept;
+  void onSuccess(const SendResult& send_result) noexcept;
 
   void onFailure(const std::error_code& ec) noexcept;
 
diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/trouble_shooting.sh
old mode 100644
new mode 100755