Fifo opt (#732)
* Prepare to optimize FIFO publishing
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* fix: SendReceipt now contains std::unique_ptr<Message> being sent
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* fix: add doc explaining why we taking ownership of the message being sent
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* feat: implement FifoProducerPartition
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* feat: implement FifoProducerImpl
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* feat: implement builder for FifoProducer
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* fix: prepare to debug
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* fix: log sending sending stages
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
---------
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt
index 8d6b039..2730447 100644
--- a/cpp/examples/CMakeLists.txt
+++ b/cpp/examples/CMakeLists.txt
@@ -4,6 +4,7 @@
endfunction()
add_example(example_producer ExampleProducer.cpp)
+add_example(example_fifo_producer ExampleFifoProducer.cpp)
add_example(example_producer_with_async ExampleProducerWithAsync.cpp)
add_example(example_producer_with_fifo_message ExampleProducerWithFifoMessage.cpp)
add_example(example_producer_with_timed_message ExampleProducerWithTimedMessage.cpp)
diff --git a/cpp/examples/ExampleFifoProducer.cpp b/cpp/examples/ExampleFifoProducer.cpp
new file mode 100644
index 0000000..9d99be3
--- /dev/null
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <algorithm>
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <memory>
+#include <random>
+#include <string>
+#include <system_error>
+
+#include "gflags/gflags.h"
+#include "rocketmq/CredentialsProvider.h"
+#include "rocketmq/FifoProducer.h"
+#include "rocketmq/Logger.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/Producer.h"
+#include "rocketmq/SendReceipt.h"
+
+using namespace ROCKETMQ_NAMESPACE;
+
+/**
+ * @brief A simple Semaphore to limit request concurrency.
+ */
+class Semaphore {
+public:
+ Semaphore(std::size_t permits) : permits_(permits) {
+ }
+
+ /**
+ * @brief Acquire a permit.
+ */
+ void acquire() {
+ while (true) {
+ std::unique_lock<std::mutex> lk(mtx_);
+ if (permits_ > 0) {
+ permits_--;
+ return;
+ }
+ cv_.wait(lk, [this]() { return permits_ > 0; });
+ }
+ }
+
+ /**
+ * @brief Release the permit back to semaphore.
+ */
+ void release() {
+ std::unique_lock<std::mutex> lk(mtx_);
+ permits_++;
+ if (1 == permits_) {
+ cv_.notify_one();
+ }
+ }
+
+private:
+ std::size_t permits_{0};
+ std::mutex mtx_;
+ std::condition_variable cv_;
+};
+
+const std::string& alphaNumeric() {
+ static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+ return alpha_numeric;
+}
+
+std::string randomString(std::string::size_type len) {
+ std::string result;
+ result.reserve(len);
+ std::random_device rd;
+ std::mt19937 generator(rd());
+ std::string source(alphaNumeric());
+ std::string::size_type generated = 0;
+ while (generated < len) {
+ std::shuffle(source.begin(), source.end(), generator);
+ std::string::size_type delta = std::min({len - generated, source.length()});
+ result.append(source.substr(0, delta));
+ generated += delta;
+ }
+ return result;
+}
+
+DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
+DEFINE_uint32(concurrency, 16, "Concurrency of FIFO producer");
+
+int main(int argc, char* argv[]) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ auto& logger = getLogger();
+ logger.setConsoleLevel(Level::Debug);
+ logger.setLevel(Level::Debug);
+ logger.init();
+
+ // Access Key/Secret pair may be acquired from management console
+ CredentialsProviderPtr credentials_provider;
+ if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+ credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret);
+ }
+
+ // In most case, you don't need to create too many producers, singleton pattern is recommended.
+ auto producer = FifoProducer::newBuilder()
+ .withConfiguration(Configuration::newBuilder()
+ .withEndpoints(FLAGS_access_point)
+ .withCredentialsProvider(credentials_provider)
+ .withSsl(FLAGS_tls)
+ .build())
+ .withConcurrency(FLAGS_concurrency)
+ .withTopics({FLAGS_topic})
+ .build();
+
+ std::atomic_bool stopped;
+ std::atomic_long count(0);
+
+ auto stats_lambda = [&] {
+ while (!stopped.load(std::memory_order_relaxed)) {
+ long cnt = count.load(std::memory_order_relaxed);
+ while (!count.compare_exchange_weak(cnt, 0)) {
+ cnt = count.load(std::memory_order_relaxed);
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ std::cout << "QPS: " << cnt << std::endl;
+ }
+ };
+
+ std::thread stats_thread(stats_lambda);
+
+ std::string body = randomString(FLAGS_message_body_size);
+
+ std::size_t completed = 0;
+ std::mutex mtx;
+ std::condition_variable cv;
+
+ std::unique_ptr<Semaphore> semaphore(new Semaphore(FLAGS_concurrency));
+
+ try {
+ for (std::size_t i = 0; i < FLAGS_total; ++i) {
+ auto message = Message::newBuilder()
+ .withTopic(FLAGS_topic)
+ .withTag("TagA")
+ .withKeys({"Key-" + std::to_string(i)})
+ .withGroup("message-group" + std::to_string(i % FLAGS_concurrency))
+ .withBody(body)
+ .build();
+ std::error_code ec;
+ auto callback = [&](const std::error_code& ec, const SendReceipt& receipt) mutable {
+ completed++;
+ count++;
+ semaphore->release();
+
+ if (completed >= FLAGS_total) {
+ cv.notify_all();
+ }
+ };
+
+ semaphore->acquire();
+ producer.send(std::move(message), callback);
+ std::cout << "Cached No." << i << " message" << std::endl;
+ }
+ } catch (...) {
+ std::cerr << "Ah...No!!!" << std::endl;
+ }
+
+ {
+ std::unique_lock<std::mutex> lk(mtx);
+ cv.wait(lk, [&]() { return completed >= FLAGS_total; });
+ std::cout << "Completed: " << completed << ", total: " << FLAGS_total << std::endl;
+ }
+
+ stopped.store(true, std::memory_order_relaxed);
+ if (stats_thread.joinable()) {
+ stats_thread.join();
+ }
+
+ return EXIT_SUCCESS;
+}
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp b/cpp/examples/ExampleProducerWithAsync.cpp
index 63b7611..d88dfc8 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -17,7 +17,6 @@
#include <algorithm>
#include <atomic>
#include <condition_variable>
-#include <cstdint>
#include <iostream>
#include <mutex>
#include <random>
diff --git a/cpp/examples/ExamplePushConsumer.cpp b/cpp/examples/ExamplePushConsumer.cpp
index ab106cb..66a85f4 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -16,7 +16,6 @@
*/
#include <chrono>
#include <iostream>
-#include <mutex>
#include <thread>
#include "gflags/gflags.h"
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp b/cpp/examples/ExampleSimpleConsumer.cpp
index 17a84b7..aedec71 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -16,7 +16,6 @@
*/
#include <chrono>
#include <iostream>
-#include <thread>
#include "gflags/gflags.h"
#include "rocketmq/Logger.h"
diff --git a/cpp/include/rocketmq/Configuration.h b/cpp/include/rocketmq/Configuration.h
index 0037c27..6dcd413 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -44,7 +44,7 @@
}
bool withSsl() const {
- return withSsl_;
+ return tls_;
}
protected:
@@ -56,7 +56,7 @@
std::string endpoints_;
CredentialsProviderPtr credentials_provider_;
std::chrono::milliseconds request_timeout_{ConfigurationDefaults::RequestTimeout};
- bool withSsl_ = true;
+ bool tls_ = true;
};
class ConfigurationBuilder {
@@ -67,7 +67,7 @@
ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds request_timeout);
- ConfigurationBuilder& withSsl(bool enable);
+ ConfigurationBuilder& withSsl(bool with_ssl);
Configuration build();
diff --git a/cpp/include/rocketmq/FifoProducer.h b/cpp/include/rocketmq/FifoProducer.h
new file mode 100644
index 0000000..fbf7662
--- /dev/null
+++ b/cpp/include/rocketmq/FifoProducer.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "Configuration.h"
+#include "Message.h"
+#include "RocketMQ.h"
+#include "SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl;
+class FifoProducerBuilder;
+class ProducerImpl;
+
+class FifoProducer {
+public:
+ static FifoProducerBuilder newBuilder();
+
+ void send(MessageConstPtr message, SendCallback callback);
+
+private:
+ std::shared_ptr<FifoProducerImpl> impl_;
+
+ explicit FifoProducer(std::shared_ptr<FifoProducerImpl> impl) : impl_(std::move(impl)) {
+ }
+
+ void start();
+
+ friend class FifoProducerBuilder;
+};
+
+class FifoProducerBuilder {
+public:
+ FifoProducerBuilder();
+
+ FifoProducerBuilder& withConfiguration(Configuration configuration);
+
+ FifoProducerBuilder& withTopics(const std::vector<std::string>& topics);
+
+ FifoProducerBuilder& withConcurrency(std::size_t concurrency);
+
+ FifoProducer build();
+
+private:
+ std::shared_ptr<FifoProducerImpl> impl_;
+ std::shared_ptr<ProducerImpl> producer_impl_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index 42004eb..6b42843 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -16,20 +16,17 @@
*/
#pragma once
-#include <chrono>
-#include <functional>
#include <memory>
#include <system_error>
#include <vector>
#include "Configuration.h"
-#include "ErrorCode.h"
-#include "Logger.h"
#include "Message.h"
#include "SendCallback.h"
#include "SendReceipt.h"
#include "Transaction.h"
#include "TransactionChecker.h"
+#include "rocketmq/Logger.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/include/rocketmq/SendReceipt.h
index 489df5e..7eef6e7 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -16,20 +16,21 @@
*/
#pragma once
-#include <cstdint>
#include <string>
-#include <utility>
#include "RocketMQ.h"
+#include "rocketmq/Message.h"
ROCKETMQ_NAMESPACE_BEGIN
struct SendReceipt {
+ std::string target;
+
std::string message_id;
std::string transaction_id;
- std::string target;
+ MessageConstPtr message;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/base/Configuration.cpp b/cpp/source/base/Configuration.cpp
index 2a136d5..66cff2e 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -38,8 +38,8 @@
return *this;
}
-ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) {
- configuration_.withSsl_ = enable;
+ConfigurationBuilder& ConfigurationBuilder::withSsl(bool with_ssl) {
+ configuration_.tls_ = with_ssl;
return *this;
}
diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp
index 643d374..7d724c7 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -27,32 +27,27 @@
#include "InvocationContext.h"
#include "LogInterceptor.h"
#include "LogInterceptorFactory.h"
-#include "MessageExt.h"
-#include "MetadataConstants.h"
#include "MixAll.h"
#include "Protocol.h"
#include "ReceiveMessageContext.h"
#include "RpcClient.h"
#include "RpcClientImpl.h"
#include "Scheduler.h"
-#include "TlsHelper.h"
+#include "SchedulerImpl.h"
#include "UtilAll.h"
-#include "apache/rocketmq/v2/definition.pb.h"
#include "google/protobuf/util/time_util.h"
#include "grpcpp/create_channel.h"
#include "rocketmq/ErrorCode.h"
-#include "rocketmq/Logger.h"
-#include "rocketmq/SendReceipt.h"
#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
-ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool withSsl)
+ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_ssl)
: scheduler_(std::make_shared<SchedulerImpl>()),
resource_namespace_(std::move(resource_namespace)),
state_(State::CREATED),
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
- withSsl_(withSsl) {
+ with_ssl_(with_ssl) {
certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
tls_channel_credential_options_.set_verify_server_certs(false);
tls_channel_credential_options_.set_check_call_host(false);
@@ -285,7 +280,7 @@
bool ClientManagerImpl::send(const std::string& target_host,
const Metadata& metadata,
SendMessageRequest& request,
- SendCallback cb) {
+ SendResultCallback cb) {
assert(cb);
SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.DebugString());
RpcClientSharedPtr client = getRpcClient(target_host);
@@ -311,15 +306,14 @@
return;
}
- SendReceipt send_receipt = {};
- send_receipt.target = target_host;
- std::error_code ec;
+ SendResult send_result = {};
+ send_result.target = target_host;
if (!invocation_context->status.ok()) {
SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code: {}, gRPC error message: {}",
invocation_context->remote_address, invocation_context->status.error_code(),
invocation_context->status.error_message());
- ec = ErrorCode::RequestTimeout;
- cb(ec, send_receipt);
+ send_result.ec = ErrorCode::RequestTimeout;
+ cb(send_result);
return;
}
@@ -328,8 +322,8 @@
case rmq::Code::OK: {
if (!invocation_context->response.entries().empty()) {
auto first = invocation_context->response.entries().begin();
- send_receipt.message_id = first->message_id();
- send_receipt.transaction_id = first->transaction_id();
+ send_result.message_id = first->message_id();
+ send_result.transaction_id = first->transaction_id();
} else {
SPDLOG_ERROR("Unexpected send-message-response: {}", invocation_context->response.DebugString());
}
@@ -338,127 +332,127 @@
case rmq::Code::ILLEGAL_TOPIC: {
SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::IllegalTopic;
+ send_result.ec = ErrorCode::IllegalTopic;
break;
}
case rmq::Code::ILLEGAL_MESSAGE_TAG: {
SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::IllegalMessageTag;
+ send_result.ec = ErrorCode::IllegalMessageTag;
break;
}
case rmq::Code::ILLEGAL_MESSAGE_KEY: {
SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::IllegalMessageKey;
+ send_result.ec = ErrorCode::IllegalMessageKey;
break;
}
case rmq::Code::ILLEGAL_MESSAGE_GROUP: {
SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::IllegalMessageGroup;
+ send_result.ec = ErrorCode::IllegalMessageGroup;
break;
}
case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: {
SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::IllegalMessageProperty;
+ send_result.ec = ErrorCode::IllegalMessageProperty;
break;
}
case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::MessagePropertiesTooLarge;
+ send_result.ec = ErrorCode::MessagePropertiesTooLarge;
break;
}
case rmq::Code::MESSAGE_BODY_TOO_LARGE: {
SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::MessageBodyTooLarge;
+ send_result.ec = ErrorCode::MessageBodyTooLarge;
break;
}
case rmq::Code::TOPIC_NOT_FOUND: {
SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::TopicNotFound;
+ send_result.ec = ErrorCode::TopicNotFound;
break;
}
case rmq::Code::NOT_FOUND: {
SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::NotFound;
+ send_result.ec = ErrorCode::NotFound;
break;
}
case rmq::Code::UNAUTHORIZED: {
SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::Unauthorized;
+ send_result.ec = ErrorCode::Unauthorized;
break;
}
case rmq::Code::FORBIDDEN: {
SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::Forbidden;
+ send_result.ec = ErrorCode::Forbidden;
break;
}
case rmq::Code::MESSAGE_CORRUPTED: {
SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::MessageCorrupted;
+ send_result.ec = ErrorCode::MessageCorrupted;
break;
}
case rmq::Code::TOO_MANY_REQUESTS: {
SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::TooManyRequests;
+ send_result.ec = ErrorCode::TooManyRequests;
break;
}
case rmq::Code::INTERNAL_SERVER_ERROR: {
SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::InternalServerError;
+ send_result.ec = ErrorCode::InternalServerError;
break;
}
case rmq::Code::HA_NOT_AVAILABLE: {
SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::InternalServerError;
+ send_result.ec = ErrorCode::InternalServerError;
break;
}
case rmq::Code::PROXY_TIMEOUT: {
SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::GatewayTimeout;
+ send_result.ec = ErrorCode::GatewayTimeout;
break;
}
case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: {
SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::GatewayTimeout;
+ send_result.ec = ErrorCode::GatewayTimeout;
break;
}
case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: {
SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::GatewayTimeout;
+ send_result.ec = ErrorCode::GatewayTimeout;
break;
}
case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address,
invocation_context->response.DebugString());
- ec = ErrorCode::MessagePropertyConflictWithType;
+ send_result.ec = ErrorCode::MessagePropertyConflictWithType;
break;
}
default: {
SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address);
- ec = ErrorCode::NotSupported;
+ send_result.ec = ErrorCode::NotSupported;
break;
}
}
- cb(ec, send_receipt);
+ cb(send_result);
};
invocation_context->callback = completion_callback;
@@ -476,7 +470,7 @@
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
- target_host, withSsl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_,
+ target_host, with_ssl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_,
std::move(interceptor_factories));
return channel;
}
@@ -520,28 +514,28 @@
rpc_clients_.clear();
}
-SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue,
- const SendMessageResponse& response,
- std::error_code& ec) {
- SendReceipt send_receipt;
+SendResult ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue,
+ const SendMessageResponse& response,
+ std::error_code& ec) {
+ SendResult send_result;
switch (response.status().code()) {
case rmq::Code::OK: {
assert(response.entries_size() > 0);
- send_receipt.message_id = response.entries().begin()->message_id();
- send_receipt.transaction_id = response.entries().begin()->transaction_id();
- return send_receipt;
+ send_result.message_id = response.entries().begin()->message_id();
+ send_result.transaction_id = response.entries().begin()->transaction_id();
+ return send_result;
}
case rmq::Code::ILLEGAL_TOPIC: {
ec = ErrorCode::BadRequest;
- return send_receipt;
+ return send_result;
}
default: {
// TODO: handle other cases.
break;
}
}
- return send_receipt;
+ return send_result;
}
void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) {
diff --git a/cpp/source/client/include/ClientManager.h b/cpp/source/client/include/ClientManager.h
index 56325fa..02b232b 100644
--- a/cpp/source/client/include/ClientManager.h
+++ b/cpp/source/client/include/ClientManager.h
@@ -22,14 +22,12 @@
#include <system_error>
#include "Client.h"
-#include "MessageExt.h"
#include "Metadata.h"
#include "ReceiveMessageCallback.h"
#include "RpcClient.h"
#include "Scheduler.h"
-#include "TelemetryBidiReactor.h"
+#include "SendResultCallback.h"
#include "TopicRouteData.h"
-#include "rocketmq/SendCallback.h"
#include "rocketmq/State.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -93,8 +91,10 @@
virtual void receiveMessage(const std::string& target, const Metadata& metadata, const ReceiveMessageRequest& request,
std::chrono::milliseconds timeout, ReceiveMessageCallback callback) = 0;
- virtual bool send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request,
- SendCallback cb) = 0;
+ virtual bool send(const std::string& target_host,
+ const Metadata& metadata,
+ SendMessageRequest& request,
+ SendResultCallback cb) = 0;
virtual std::error_code notifyClientTermination(const std::string& target_host, const Metadata& metadata,
const NotifyClientTerminationRequest& request,
diff --git a/cpp/source/client/include/ClientManagerImpl.h b/cpp/source/client/include/ClientManagerImpl.h
index 653fcad..5f1b27c 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -20,7 +20,6 @@
#include <chrono>
#include <cstdint>
#include <functional>
-#include <future>
#include <memory>
#include <string>
#include <system_error>
@@ -29,18 +28,13 @@
#include "Client.h"
#include "ClientManager.h"
#include "InsecureCertificateVerifier.h"
-#include "InvocationContext.h"
#include "ReceiveMessageCallback.h"
#include "RpcClientImpl.h"
-#include "SchedulerImpl.h"
-#include "SendMessageContext.h"
-#include "TelemetryBidiReactor.h"
#include "ThreadPoolImpl.h"
#include "TopicRouteData.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "rocketmq/State.h"
@@ -54,7 +48,7 @@
* effectively.
* @param resource_namespace Abstract resource namespace, in which this client manager lives.
*/
- explicit ClientManagerImpl(std::string resource_namespace, bool withSsl = true);
+ explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = true);
~ClientManagerImpl() override;
@@ -89,7 +83,7 @@
bool send(const std::string& target_host,
const Metadata& metadata,
SendMessageRequest& request,
- SendCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
+ SendResultCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
/**
* Get a RpcClient according to the given target hosts, which follows scheme specified
@@ -105,7 +99,7 @@
RpcClientSharedPtr getRpcClient(const std::string& target_host, bool need_heartbeat = true) override
LOCKS_EXCLUDED(rpc_clients_mtx_);
- static SendReceipt processSendResponse(const rmq::MessageQueue& message_queue,
+ static SendResult processSendResponse(const rmq::MessageQueue& message_queue,
const SendMessageResponse& response,
std::error_code& ec);
@@ -242,7 +236,7 @@
grpc::ChannelArguments channel_arguments_;
bool trace_{false};
- bool withSsl_;
+ bool with_ssl_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/SendResult.h b/cpp/source/client/include/SendResult.h
new file mode 100644
index 0000000..d5f8527
--- /dev/null
+++ b/cpp/source/client/include/SendResult.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <system_error>
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+struct SendResult {
+ std::error_code ec;
+ std::string target;
+
+ std::string message_id;
+ std::string transaction_id;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/SendResultCallback.h b/cpp/source/client/include/SendResultCallback.h
new file mode 100644
index 0000000..3dd5bda
--- /dev/null
+++ b/cpp/source/client/include/SendResultCallback.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <functional>
+
+#include "SendResult.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+using SendResultCallback = std::function<void(const SendResult&)>;
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoContext.cpp b/cpp/source/rocketmq/FifoContext.cpp
new file mode 100644
index 0000000..f1affd1
--- /dev/null
+++ b/cpp/source/rocketmq/FifoContext.cpp
@@ -0,0 +1,16 @@
+#include "FifoContext.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoContext::FifoContext(MessageConstPtr message, SendCallback callback)
+ : message(std::move(message)), callback(callback) {
+}
+
+FifoContext::FifoContext(FifoContext&& rhs) noexcept {
+ this->message = std::move(rhs.message);
+ this->callback = rhs.callback;
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducer.cpp b/cpp/source/rocketmq/FifoProducer.cpp
new file mode 100644
index 0000000..da43b58
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducer.cpp
@@ -0,0 +1,56 @@
+#include "rocketmq/FifoProducer.h"
+
+#include <cstddef>
+#include <memory>
+
+#include "FifoProducerImpl.h"
+#include "ProducerImpl.h"
+#include "StaticNameServerResolver.h"
+#include "rocketmq/Configuration.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoProducerBuilder FifoProducer::newBuilder() {
+ return {};
+}
+
+FifoProducerBuilder::FifoProducerBuilder() : producer_impl_(std::make_shared<ProducerImpl>()) {
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConfiguration(Configuration configuration) {
+ auto name_server_resolver = std::make_shared<StaticNameServerResolver>(configuration.endpoints());
+ producer_impl_->withNameServerResolver(std::move(name_server_resolver));
+ producer_impl_->withCredentialsProvider(configuration.credentialsProvider());
+ producer_impl_->withRequestTimeout(configuration.requestTimeout());
+ producer_impl_->withSsl(configuration.withSsl());
+ return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withTopics(const std::vector<std::string>& topics) {
+ producer_impl_->withTopics(topics);
+ return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConcurrency(std::size_t concurrency) {
+ this->impl_ = std::make_shared<FifoProducerImpl>(producer_impl_, concurrency);
+ return *this;
+}
+
+FifoProducer FifoProducerBuilder::build() {
+ FifoProducer fifo_producer(this->impl_);
+ fifo_producer.start();
+ return fifo_producer;
+}
+
+void FifoProducer::start() {
+ impl_->internalProducer()->start();
+}
+
+void FifoProducer::send(MessageConstPtr message, SendCallback callback) {
+ impl_->send(std::move(message), callback);
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/FifoProducerImpl.cpp b/cpp/source/rocketmq/FifoProducerImpl.cpp
new file mode 100644
index 0000000..2b33345
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerImpl.cpp
@@ -0,0 +1,21 @@
+#include "FifoProducerImpl.h"
+
+#include <utility>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerImpl::send(MessageConstPtr message, SendCallback callback) {
+ auto& group = message->group();
+ std::size_t hash = hash_fn_(group);
+ std::size_t slot = hash % concurrency_;
+
+ FifoContext context(std::move(message), callback);
+ partitions_[slot]->add(std::move(context));
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp b/cpp/source/rocketmq/FifoProducerPartition.cpp
new file mode 100644
index 0000000..94e1c72
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -0,0 +1,91 @@
+#include "FifoProducerPartition.h"
+
+#include <absl/synchronization/mutex.h>
+
+#include <atomic>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerPartition::add(FifoContext&& context) {
+ {
+ absl::MutexLock lk(&messages_mtx_);
+ messages_.emplace_back(std::move(context));
+ SPDLOG_DEBUG("{} has {} pending messages after #add", name_, messages_.size());
+ }
+
+ trySend();
+}
+
+void FifoProducerPartition::trySend() {
+ bool expected = false;
+ if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
+ absl::MutexLock lk(&messages_mtx_);
+
+ if (messages_.empty()) {
+ SPDLOG_DEBUG("There is no more messages to send");
+ return;
+ }
+
+ FifoContext& ctx = messages_.front();
+ MessageConstPtr message = std::move(ctx.message);
+ SendCallback send_callback = ctx.callback;
+
+ std::shared_ptr<FifoProducerPartition> partition = shared_from_this();
+ auto fifo_callback = [=](const std::error_code& ec, const SendReceipt& receipt) mutable {
+ partition->onComplete(ec, receipt, send_callback);
+ };
+ SPDLOG_DEBUG("Sending FIFO message from {}", name_);
+ producer_->send(std::move(message), fifo_callback);
+ messages_.pop_front();
+ SPDLOG_DEBUG("In addition to the inflight one, there is {} messages pending in {}", messages_.size(), name_);
+ } else {
+ SPDLOG_DEBUG("There is an inflight message");
+ }
+}
+
+void FifoProducerPartition::onComplete(const std::error_code& ec, const SendReceipt& receipt, SendCallback& callback) {
+ if (ec) {
+ SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message());
+ } else {
+ SPDLOG_DEBUG("{} completed OK", name_);
+ }
+
+ if (!ec) {
+ callback(ec, receipt);
+ // update inflight status
+ bool expected = true;
+ if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
+ trySend();
+ } else {
+ SPDLOG_ERROR("{}: Unexpected inflight status", name_);
+ }
+ return;
+ }
+
+ // Put the message back to the front of the list
+ SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+ FifoContext retry_context(std::move(receipt_mut.message), callback);
+ {
+ absl::MutexLock lk(&messages_mtx_);
+ messages_.emplace_front(std::move(retry_context));
+ }
+
+ // Update inflight status
+ bool expected = true;
+ if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
+ trySend();
+ } else {
+ SPDLOG_ERROR("Unexpected inflight status");
+ }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 78d812e..907d0a2 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -21,12 +21,8 @@
#include <system_error>
#include <utility>
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
-#include "MixAll.h"
#include "ProducerImpl.h"
#include "StaticNameServerResolver.h"
-#include "absl/strings/str_split.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/SendReceipt.h"
#include "rocketmq/Transaction.h"
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp
index 7313016..34975e7 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -17,38 +17,27 @@
#include "ProducerImpl.h"
#include <algorithm>
-#include <apache/rocketmq/v2/definition.pb.h>
-
#include <atomic>
#include <cassert>
#include <chrono>
-#include <limits>
#include <memory>
#include <system_error>
#include <utility>
-#include "Client.h"
-#include "MessageGroupQueueSelector.h"
-#include "MetadataConstants.h"
+#include "apache/rocketmq/v2/definition.pb.h"
#include "MixAll.h"
#include "Protocol.h"
#include "PublishInfoCallback.h"
-#include "RpcClient.h"
#include "SendContext.h"
-#include "SendMessageContext.h"
#include "Signature.h"
-#include "Tag.h"
#include "TracingUtility.h"
#include "TransactionImpl.h"
-#include "UniqueIdGenerator.h"
#include "UtilAll.h"
-#include "absl/strings/str_join.h"
#include "opencensus/trace/propagation/trace_context.h"
#include "opencensus/trace/span.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/Message.h"
#include "rocketmq/SendReceipt.h"
-#include "rocketmq/Tracing.h"
#include "rocketmq/Transaction.h"
#include "rocketmq/TransactionChecker.h"
@@ -203,19 +192,28 @@
SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noexcept {
ensureRunning(ec);
if (ec) {
- return {};
+ SPDLOG_WARN("Producer is not running");
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
auto topic_publish_info = getPublishInfo(message->topic());
if (!topic_publish_info) {
+ SPDLOG_WARN("Route of topic[{}] is not found", message->topic());
ec = ErrorCode::NotFound;
- return {};
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
std::vector<rmq::MessageQueue> message_queue_list;
if (!topic_publish_info->selectMessageQueues(absl::make_optional<std::string>(), message_queue_list)) {
+ SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]", message->topic());
ec = ErrorCode::NotFound;
- return {};
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
auto mtx = std::make_shared<absl::Mutex>();
@@ -224,9 +222,10 @@
SendReceipt send_receipt;
// Define callback
- auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) {
+ auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) mutable {
ec = code;
- send_receipt = receipt;
+ SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+ send_receipt.message = std::move(receipt_mut.message);
{
absl::MutexLock lk(mtx.get());
completed = true;
@@ -251,6 +250,7 @@
ensureRunning(ec);
if (ec) {
SendReceipt send_receipt;
+ send_receipt.message = std::move(message);
cb(ec, send_receipt);
}
@@ -264,6 +264,7 @@
// No route entries of the given topic is available
if (ec) {
SendReceipt send_receipt;
+ send_receipt.message = std::move(ptr);
cb(ec, send_receipt);
return;
}
@@ -271,6 +272,7 @@
if (!publish_info) {
std::error_code ec = ErrorCode::NotFound;
SendReceipt send_receipt;
+ send_receipt.message = std::move(ptr);
cb(ec, send_receipt);
return;
}
@@ -280,6 +282,7 @@
if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) {
std::error_code ec = ErrorCode::NotFound;
SendReceipt send_receipt;
+ send_receipt.message = std::move(ptr);
cb(ec, send_receipt);
return;
}
@@ -338,12 +341,12 @@
Metadata metadata;
Signature::sign(client_config_, metadata);
- auto callback = [context](const std::error_code& ec, const SendReceipt& send_receipt) {
- if (ec) {
- context->onFailure(ec);
+ auto callback = [context](const SendResult& send_result) {
+ if (send_result.ec) {
+ context->onFailure(send_result.ec);
return;
}
- context->onSuccess(send_receipt);
+ context->onSuccess(send_result);
};
client_manager_->send(target, metadata, request, callback);
@@ -354,12 +357,14 @@
std::error_code ec;
validate(*message, ec);
if (ec) {
+ send_receipt.message = std::move(message);
callback(ec, send_receipt);
return;
}
if (list.empty()) {
ec = ErrorCode::NotFound;
+ send_receipt.message = std::move(message);
callback(ec, send_receipt);
return;
}
diff --git a/cpp/source/rocketmq/SendContext.cpp b/cpp/source/rocketmq/SendContext.cpp
index 385a1a9..bd97384 100644
--- a/cpp/source/rocketmq/SendContext.cpp
+++ b/cpp/source/rocketmq/SendContext.cpp
@@ -21,41 +21,44 @@
#include "ProducerImpl.h"
#include "PublishStats.h"
#include "Tag.h"
-#include "TransactionImpl.h"
-#include "opencensus/trace/propagation/trace_context.h"
#include "opencensus/trace/span.h"
-#include "rocketmq/Logger.h"
+#include "rocketmq/ErrorCode.h"
#include "rocketmq/SendReceipt.h"
#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
-void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept {
+void SendContext::onSuccess(const SendResult& send_result) noexcept {
{
// Mark end of send-message span.
span_.SetStatus(opencensus::trace::StatusCode::OK);
span_.End();
}
- auto publisher = producer_.lock();
- if (!publisher) {
+ auto producer = producer_.lock();
+ if (!producer) {
+ SPDLOG_WARN("Producer has been destructed");
return;
}
// Collect metrics
{
auto duration = std::chrono::steady_clock::now() - request_time_;
- opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}},
+ opencensus::stats::Record({{producer->stats().latency(), MixAll::millisecondsOf(duration)}},
{
{Tag::topicTag(), message_->topic()},
- {Tag::clientIdTag(), publisher->config().client_id},
+ {Tag::clientIdTag(), producer->config().client_id},
{Tag::invocationStatusTag(), "success"},
});
}
// send_receipt.traceContext(opencensus::trace::propagation::ToTraceParentHeader(span_.context()));
- std::error_code ec;
- callback_(ec, send_receipt);
+ SendReceipt send_receipt = {};
+ send_receipt.target = send_result.target;
+ send_receipt.message_id = send_result.message_id;
+ send_receipt.transaction_id = send_result.transaction_id;
+ send_receipt.message = std::move(message_);
+ callback_(send_result.ec, send_receipt);
}
void SendContext::onFailure(const std::error_code& ec) noexcept {
@@ -65,38 +68,36 @@
span_.End();
}
- auto publisher = producer_.lock();
- if (!publisher) {
+ auto producer = producer_.lock();
+ if (!producer) {
+ SPDLOG_WARN("Producer has been destructed");
return;
}
// Collect metrics
{
auto duration = std::chrono::steady_clock::now() - request_time_;
- opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}},
+ opencensus::stats::Record({{producer->stats().latency(), MixAll::millisecondsOf(duration)}},
{
{Tag::topicTag(), message_->topic()},
- {Tag::clientIdTag(), publisher->config().client_id},
+ {Tag::clientIdTag(), producer->config().client_id},
{Tag::invocationStatusTag(), "failure"},
});
}
- if (++attempt_times_ >= publisher->maxAttemptTimes()) {
- SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", attempt_times_, publisher->maxAttemptTimes());
- callback_(ec, {});
- return;
- }
-
- std::shared_ptr<ProducerImpl> producer = producer_.lock();
- if (!producer) {
- SPDLOG_WARN("Producer has been destructed");
- callback_(ec, {});
+ if (++attempt_times_ >= producer->maxAttemptTimes()) {
+ SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", attempt_times_, producer->maxAttemptTimes());
+ SendReceipt receipt{};
+ receipt.message = std::move(message_);
+ callback_(ec, receipt);
return;
}
if (candidates_.empty()) {
SPDLOG_WARN("No alternative hosts to perform additional retries");
- callback_(ec, {});
+ SendReceipt receipt{};
+ receipt.message = std::move(message_);
+ callback_(ec, receipt);
return;
}
@@ -106,7 +107,7 @@
auto ctx = shared_from_this();
// If publish message requests are throttled, retry after backoff
if (ErrorCode::TooManyRequests == ec) {
- auto&& backoff = publisher->backoff(attempt_times_);
+ auto&& backoff = producer->backoff(attempt_times_);
SPDLOG_DEBUG("Publish message[topic={}, message-id={}] is throttled. Retry after {}ms", message_->topic(),
message_->id(), MixAll::millisecondsOf(backoff));
auto retry_cb = [=]() { producer->sendImpl(ctx); };
diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h
index c266047..d769396 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -94,8 +94,8 @@
client_config_.request_timeout = absl::FromChrono(request_timeout);
}
- void withSsl(bool enable) {
- client_config_.withSsl = enable;
+ void withSsl(bool with_ssl) {
+ client_config_.withSsl = with_ssl;
}
/**
diff --git a/cpp/source/rocketmq/include/FifoContext.h b/cpp/source/rocketmq/include/FifoContext.h
new file mode 100644
index 0000000..55812ba
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoContext.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+struct FifoContext {
+ MessageConstPtr message;
+ SendCallback callback;
+
+ FifoContext(MessageConstPtr message, SendCallback callback);
+
+ FifoContext(FifoContext&& rhs) noexcept;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h b/cpp/source/rocketmq/include/FifoProducerImpl.h
new file mode 100644
index 0000000..180c3f9
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "FifoProducerPartition.h"
+#include "ProducerImpl.h"
+#include "fmt/format.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl : std::enable_shared_from_this<FifoProducerImpl> {
+public:
+ FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t concurrency)
+ : producer_(producer), concurrency_(concurrency), partitions_(concurrency) {
+ for (auto i = 0; i < concurrency; i++) {
+ partitions_[i] = std::make_shared<FifoProducerPartition>(producer_, fmt::format("slot-{}", i));
+ }
+ };
+
+ void send(MessageConstPtr message, SendCallback callback);
+
+ std::shared_ptr<ProducerImpl>& internalProducer() {
+ return producer_;
+ }
+
+private:
+ std::shared_ptr<ProducerImpl> producer_;
+ std::vector<std::shared_ptr<FifoProducerPartition>> partitions_;
+ std::size_t concurrency_;
+ std::hash<std::string> hash_fn_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h b/cpp/source/rocketmq/include/FifoProducerPartition.h
new file mode 100644
index 0000000..96bb96f
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -0,0 +1,39 @@
+#pragma once
+
+#include <absl/base/internal/thread_annotations.h>
+
+#include <atomic>
+#include <list>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "ProducerImpl.h"
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerPartition : public std::enable_shared_from_this<FifoProducerPartition> {
+public:
+ FifoProducerPartition(std::shared_ptr<ProducerImpl> producer, std::string&& name)
+ : producer_(producer), name_(std::move(name)) {
+ }
+
+ void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
+
+ void trySend() LOCKS_EXCLUDED(messages_mtx_);
+
+ void onComplete(const std::error_code& ec, const SendReceipt& receipt, SendCallback& callback);
+
+private:
+ std::shared_ptr<ProducerImpl> producer_;
+ std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_);
+ absl::Mutex messages_mtx_;
+ std::atomic_bool inflight_{false};
+ std::string name_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h b/cpp/source/rocketmq/include/ProducerImpl.h
index d7260a9..b572f20 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -16,28 +16,23 @@
*/
#pragma once
-#include <chrono>
#include <memory>
-#include <mutex>
#include <string>
#include <system_error>
#include "ClientImpl.h"
-#include "ClientManagerImpl.h"
#include "MixAll.h"
#include "PublishInfoCallback.h"
+#include "PublishStats.h"
#include "SendContext.h"
#include "TopicPublishInfo.h"
#include "TransactionImpl.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
#include "rocketmq/Message.h"
#include "rocketmq/SendCallback.h"
#include "rocketmq/SendReceipt.h"
-#include "rocketmq/State.h"
#include "rocketmq/TransactionChecker.h"
-#include "PublishStats.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -53,8 +48,22 @@
void shutdown() override;
+ /**
+ * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
+ * sent.
+ *
+ * Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating
+ * application to conduct customized retry policy.
+ */
SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept;
+ /**
+ * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
+ * sent.
+ *
+ * Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating
+ * application to conduct customized retry policy.
+ */
void send(MessageConstPtr message, SendCallback callback);
void setTransactionChecker(TransactionChecker checker);
@@ -64,6 +73,13 @@
return absl::make_unique<TransactionImpl>(producer);
}
+ /**
+ * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during
+ * sent.
+ *
+ * TODO: Refine this API. Current API is not good enough as it cannot handle the message back to its caller on publish
+ * failure.
+ */
void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction);
/**
diff --git a/cpp/source/rocketmq/include/SendContext.h b/cpp/source/rocketmq/include/SendContext.h
index 4c05ceb..4067532 100644
--- a/cpp/source/rocketmq/include/SendContext.h
+++ b/cpp/source/rocketmq/include/SendContext.h
@@ -19,16 +19,12 @@
#include <memory>
#include <system_error>
-#include "absl/container/flat_hash_map.h"
-#include "absl/synchronization/mutex.h"
-#include "opencensus/trace/span.h"
-
#include "Protocol.h"
+#include "SendResult.h"
#include "TransactionImpl.h"
-#include "rocketmq/ErrorCode.h"
+#include "opencensus/trace/span.h"
#include "rocketmq/Message.h"
#include "rocketmq/SendCallback.h"
-#include "rocketmq/SendReceipt.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -47,7 +43,7 @@
span_(opencensus::trace::Span::BlankSpan()) {
}
- void onSuccess(const SendReceipt& send_receipt) noexcept;
+ void onSuccess(const SendResult& send_result) noexcept;
void onFailure(const std::error_code& ec) noexcept;
diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/trouble_shooting.sh
old mode 100644
new mode 100755