Fix group_name issue and virtual inheritance issue (#22)
* Fix include order
* Fix group-name issue
* Fix virtual inheritance issue
diff --git a/.bazelrc b/.bazelrc
index 5dc7579..e92ef36 100644
--- a/.bazelrc
+++ b/.bazelrc
@@ -127,4 +127,6 @@
build:coverage --strategy=TestRunner=sandboxed,local
build:coverage --strategy=CoverageReport=sandboxed,local
build:coverage --collect_code_coverage
-build:coverage --instrumentation_filter="//src/main/cpp[/:],//api[/:],-//src/main/cpp/base/mocks[/:],-//src/main/cpp/client/mocks[/:],-//src/main/cpp/rocketmq/mocks[/:]"
\ No newline at end of file
+build:coverage --instrumentation_filter="//src/main/cpp[/:],//api[/:],-//src/main/cpp/base/mocks[/:],-//src/main/cpp/client/mocks[/:],-//src/main/cpp/rocketmq/mocks[/:]"
+
+test --test_output=errors
\ No newline at end of file
diff --git a/api/rocketmq/AsyncCallback.h b/api/rocketmq/AsyncCallback.h
index 44d89cd..bb431a1 100644
--- a/api/rocketmq/AsyncCallback.h
+++ b/api/rocketmq/AsyncCallback.h
@@ -15,8 +15,11 @@
class SendCallback : public AsyncCallback {
public:
virtual ~SendCallback() = default;
+
virtual void onSuccess(SendResult& send_result) = 0;
+
virtual void onException(const MQException& e) = 0;
+
virtual SendCallbackType getSendCallbackType() { return SendCallbackType::noAutoDeleteSendCallback; }
};
diff --git a/api/rocketmq/CommunicationMode.h b/api/rocketmq/CommunicationMode.h
deleted file mode 100644
index ad29a49..0000000
--- a/api/rocketmq/CommunicationMode.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#pragma once
-
-#include "rocketmq/RocketMQ.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-enum CommunicationMode
-{
- ComMode_SYNC,
- ComMode_ASYNC,
- ComMode_ONEWAY
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/api/rocketmq/Credentials.h b/api/rocketmq/Credentials.h
index 1b19a2f..33374f6 100644
--- a/api/rocketmq/Credentials.h
+++ b/api/rocketmq/Credentials.h
@@ -1,10 +1,11 @@
#pragma once
-#include "rocketmq/RocketMQ.h"
#include <chrono>
#include <string>
#include <utility>
+#include "rocketmq/RocketMQ.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class Credentials {
diff --git a/api/rocketmq/CredentialsProvider.h b/api/rocketmq/CredentialsProvider.h
index 567220a..93ba8a2 100644
--- a/api/rocketmq/CredentialsProvider.h
+++ b/api/rocketmq/CredentialsProvider.h
@@ -1,11 +1,11 @@
#pragma once
-#include "Credentials.h"
-
-#include <mutex>
#include <chrono>
-#include <utility>
#include <memory>
+#include <mutex>
+#include <utility>
+
+#include "Credentials.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -44,6 +44,7 @@
static const char* ENVIRONMENT_ACCESS_KEY;
static const char* ENVIRONMENT_ACCESS_SECRET;
+
private:
std::string access_key_;
std::string access_secret_;
@@ -65,9 +66,8 @@
* For test purpose only.
* @return
*/
- static const char* credentialFile() {
- return CREDENTIAL_FILE_;
- }
+ static const char* credentialFile() { return CREDENTIAL_FILE_; }
+
private:
std::chrono::system_clock::duration refresh_interval_{std::chrono::seconds(10)};
std::string access_key_;
diff --git a/api/rocketmq/DefaultMQPushConsumer.h b/api/rocketmq/DefaultMQPushConsumer.h
index 176d909..f20f9c2 100644
--- a/api/rocketmq/DefaultMQPushConsumer.h
+++ b/api/rocketmq/DefaultMQPushConsumer.h
@@ -92,9 +92,10 @@
void setMessageModel(MessageModel message_model);
+ std::string groupName() const;
+
private:
std::shared_ptr<PushConsumerImpl> impl_;
- std::string group_name_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/api/rocketmq/Executor.h b/api/rocketmq/Executor.h
index 1d78e82..ccd4e0f 100644
--- a/api/rocketmq/Executor.h
+++ b/api/rocketmq/Executor.h
@@ -1,8 +1,9 @@
#pragma once
-#include "RocketMQ.h"
#include <functional>
+#include "RocketMQ.h"
+
ROCKETMQ_NAMESPACE_BEGIN
using Executor = std::function<void(const std::function<void()>&)>;
diff --git a/api/rocketmq/ExpressionType.h b/api/rocketmq/ExpressionType.h
index dcafc24..93855af 100644
--- a/api/rocketmq/ExpressionType.h
+++ b/api/rocketmq/ExpressionType.h
@@ -1,6 +1,7 @@
#pragma once
#include <cstdint>
+
#include "RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/api/rocketmq/LocalTransactionStateChecker.h b/api/rocketmq/LocalTransactionStateChecker.h
index 6d0bef6..0badebe 100644
--- a/api/rocketmq/LocalTransactionStateChecker.h
+++ b/api/rocketmq/LocalTransactionStateChecker.h
@@ -1,8 +1,9 @@
#pragma once
+#include <memory>
+
#include "MQMessageExt.h"
#include "Transaction.h"
-#include <memory>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/api/rocketmq/Logger.h b/api/rocketmq/Logger.h
index 24b0b34..dec8778 100644
--- a/api/rocketmq/Logger.h
+++ b/api/rocketmq/Logger.h
@@ -6,6 +6,7 @@
#include <mutex>
#include <string>
#include <cstdint>
+
#include "RocketMQ.h"
#ifndef SPDLOG_ACTIVE_LEVEL
diff --git a/api/rocketmq/MQClientException.h b/api/rocketmq/MQClientException.h
index 5c6c0f9..3b13854 100644
--- a/api/rocketmq/MQClientException.h
+++ b/api/rocketmq/MQClientException.h
@@ -11,7 +11,7 @@
class MQException : public std::exception {
public:
- MQException(const std::string& msg, int error, const char* file, int line) throw()
+ MQException(const std::string& msg, int error, const char* file, int line) noexcept
: m_error(error), m_line(line), m_file(file) {
try {
std::stringstream ss;
@@ -21,7 +21,7 @@
}
}
- MQException(const std::string& msg, int error, const char* file, const char* type, int line) throw()
+ MQException(const std::string& msg, int error, const char* file, const char* type, int line) noexcept
: m_error(error), m_line(line), m_file(file), m_type(type) {
try {
std::stringstream ss;
@@ -31,13 +31,13 @@
}
}
- virtual ~MQException() throw() {}
+ ~MQException() noexcept override = default;
- const char* what() const throw() { return m_msg.c_str(); }
+ const char* what() const noexcept override { return m_msg.c_str(); }
- int GetError() const throw() { return m_error; }
+ int GetError() const noexcept { return m_error; }
- virtual const char* GetType() const throw() { return m_type.c_str(); }
+ virtual const char* GetType() const noexcept { return m_type.c_str(); }
protected:
int m_error;
diff --git a/api/rocketmq/MQMessageExt.h b/api/rocketmq/MQMessageExt.h
index c814c8e..ad5942e 100644
--- a/api/rocketmq/MQMessageExt.h
+++ b/api/rocketmq/MQMessageExt.h
@@ -1,8 +1,9 @@
#pragma once
-#include "MQMessage.h"
#include <chrono>
+#include "MQMessage.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class MessageAccessor;
diff --git a/api/rocketmq/MQSelector.h b/api/rocketmq/MQSelector.h
index 1131691..029c6f8 100644
--- a/api/rocketmq/MQSelector.h
+++ b/api/rocketmq/MQSelector.h
@@ -1,15 +1,17 @@
#pragma once
#include <vector>
-#include "MQMessageQueue.h"
+
#include "MQMessage.h"
+#include "MQMessageQueue.h"
ROCKETMQ_NAMESPACE_BEGIN
class MessageQueueSelector {
public:
- virtual ~MessageQueueSelector() {}
- virtual MQMessageQueue select(const std::vector<MQMessageQueue>& mqs, const MQMessage& msg, void* arg) = 0;
+ virtual ~MessageQueueSelector() = default;
+
+ virtual MQMessageQueue select(const std::vector<MQMessageQueue>& mqs, const MQMessage& msg, void* arg) = 0;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/api/rocketmq/MessageModel.h b/api/rocketmq/MessageModel.h
index 6f9f5a7..12fb37f 100644
--- a/api/rocketmq/MessageModel.h
+++ b/api/rocketmq/MessageModel.h
@@ -1,4 +1,5 @@
#pragma once
+
#include "RocketMQ.h"
#include <cstdint>
diff --git a/api/rocketmq/PullResult.h b/api/rocketmq/PullResult.h
index 473cae3..acd611e 100644
--- a/api/rocketmq/PullResult.h
+++ b/api/rocketmq/PullResult.h
@@ -1,9 +1,10 @@
#pragma once
-#include "MQMessageExt.h"
#include <cstdlib>
#include <vector>
+#include "MQMessageExt.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class PullResult {
diff --git a/api/rocketmq/SendResult.h b/api/rocketmq/SendResult.h
index 9eba746..961d229 100644
--- a/api/rocketmq/SendResult.h
+++ b/api/rocketmq/SendResult.h
@@ -83,10 +83,10 @@
}
private:
- SendStatus send_status_;
+ SendStatus send_status_{SendStatus::SEND_OK};
std::string message_id_;
mutable MQMessageQueue message_queue_;
- long long queue_offset_;
+ long long queue_offset_{0};
std::string transaction_id_;
std::string region_id_;
std::string trace_context_;
diff --git a/example/rocketmq/ExamplePushConsumer.cpp b/example/rocketmq/ExamplePushConsumer.cpp
index 40c1c14..a319345 100644
--- a/example/rocketmq/ExamplePushConsumer.cpp
+++ b/example/rocketmq/ExamplePushConsumer.cpp
@@ -1,11 +1,14 @@
-#include "rocketmq/DefaultMQPushConsumer.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
+#include "rocketmq/Logger.h"
+
+#include "spdlog/spdlog.h"
+
+#include "rocketmq/DefaultMQPushConsumer.h"
+
using namespace rocketmq;
class SampleMQMessageListener : public StandardMessageListener {
@@ -13,6 +16,7 @@
ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) override {
for (const MQMessageExt& msg : msgs) {
SPDLOG_INFO("Consume message[Topic={}, MessageId={}] OK", msg.getTopic(), msg.getMsgId());
+ std::cout << "Consume Message[MsgId=" << msg.getMsgId() << "] OK" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return ConsumeMessageResult::SUCCESS;
@@ -25,20 +29,20 @@
logger.setLevel(Level::Debug);
logger.init();
- const char* cid = "GID_cpp_sdk_standard";
+ const char* group_id = "GID_cpp_sdk_standard";
const char* topic = "cpp_sdk_standard";
const char* resource_namespace = "MQ_INST_1080056302921134_BXuIbML7";
- DefaultMQPushConsumer push_consumer(cid);
+ DefaultMQPushConsumer push_consumer(group_id);
push_consumer.setResourceNamespace(resource_namespace);
push_consumer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
push_consumer.setNamesrvAddr("47.98.116.189:80");
MessageListener* listener = new SampleMQMessageListener;
- push_consumer.setGroupName(cid);
push_consumer.setInstanceName("instance_0");
+ // push_consumer.setGroupName(group_id);
push_consumer.subscribe(topic, "*");
push_consumer.registerMessageListener(listener);
- push_consumer.setConsumeThreadCount(2);
+ push_consumer.setConsumeThreadCount(4);
push_consumer.start();
std::this_thread::sleep_for(std::chrono::seconds(300));
diff --git a/src/main/cpp/admin/include/ServerCall.h b/src/main/cpp/admin/include/ServerCall.h
index 213cf3c..bcb3ccc 100644
--- a/src/main/cpp/admin/include/ServerCall.h
+++ b/src/main/cpp/admin/include/ServerCall.h
@@ -1,9 +1,11 @@
#pragma once
-#include "apache/rocketmq/v1/admin.grpc.pb.h"
-#include "rocketmq/RocketMQ.h"
#include <cassert>
+#include "apache/rocketmq/v1/admin.grpc.pb.h"
+
+#include "rocketmq/RocketMQ.h"
+
namespace rmq = apache::rocketmq::v1;
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/base/include/Histogram.h b/src/main/cpp/base/include/Histogram.h
index 573bf7c..e62d554 100644
--- a/src/main/cpp/base/include/Histogram.h
+++ b/src/main/cpp/base/include/Histogram.h
@@ -5,6 +5,7 @@
#include <memory>
#include <string>
#include <vector>
+
#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/base/include/HostInfo.h b/src/main/cpp/base/include/HostInfo.h
index 68b8bfa..301970f 100644
--- a/src/main/cpp/base/include/HostInfo.h
+++ b/src/main/cpp/base/include/HostInfo.h
@@ -1,8 +1,9 @@
#pragma once
-#include "rocketmq/RocketMQ.h"
#include <string>
+#include "rocketmq/RocketMQ.h"
+
ROCKETMQ_NAMESPACE_BEGIN
struct HostInfo {
std::string site_;
diff --git a/src/main/cpp/base/include/HttpClientImpl.h b/src/main/cpp/base/include/HttpClientImpl.h
index 8e42a33..e8f37ae 100644
--- a/src/main/cpp/base/include/HttpClientImpl.h
+++ b/src/main/cpp/base/include/HttpClientImpl.h
@@ -1,12 +1,11 @@
#pragma once
-#include "HttpClient.h"
-
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/mutex.h"
#include "httplib.h"
+#include "HttpClient.h"
#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/base/include/InvocationContext.h b/src/main/cpp/base/include/InvocationContext.h
index 32ef5a7..276bb64 100644
--- a/src/main/cpp/base/include/InvocationContext.h
+++ b/src/main/cpp/base/include/InvocationContext.h
@@ -1,18 +1,20 @@
#pragma once
-#include "LoggerImpl.h"
-#include "MetadataConstants.h"
-#include "UniqueIdGenerator.h"
-#include "absl/synchronization/mutex.h"
-#include "absl/time/time.h"
-#include "rocketmq/RocketMQ.h"
#include <atomic>
#include <ctime>
#include <functional>
-#include <grpcpp/client_context.h>
-#include <grpcpp/grpcpp.h>
-#include <grpcpp/impl/codegen/async_stream.h>
-#include <grpcpp/impl/codegen/async_unary_call.h>
+
+#include "absl/synchronization/mutex.h"
+#include "absl/time/time.h"
+#include "grpcpp/client_context.h"
+#include "grpcpp/grpcpp.h"
+#include "grpcpp/impl/codegen/async_stream.h"
+#include "grpcpp/impl/codegen/async_unary_call.h"
+
+#include "LoggerImpl.h"
+#include "MetadataConstants.h"
+#include "UniqueIdGenerator.h"
+#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/base/include/MessageAccessor.h b/src/main/cpp/base/include/MessageAccessor.h
index 9f6ca75..9e1b3b8 100644
--- a/src/main/cpp/base/include/MessageAccessor.h
+++ b/src/main/cpp/base/include/MessageAccessor.h
@@ -1,7 +1,6 @@
#pragma once
#include "rocketmq/MQMessageExt.h"
-#include "rocketmq/RocketMQ.h"
#include "Protocol.h"
@@ -28,9 +27,9 @@
static void setDeliveryAttempt(MQMessageExt& message, int32_t attempt_times);
- static void setDecodedTimestamp(MQMessageExt &message, absl::Time decode_timestamp);
- static absl::Time decodedTimestamp(const MQMessageExt &message);
-
+ static void setDecodedTimestamp(MQMessageExt& message, absl::Time decode_timestamp);
+ static absl::Time decodedTimestamp(const MQMessageExt& message);
+
static void setInvisiblePeriod(MQMessageExt& message, absl::Duration invisible_period);
static void setReceiptHandle(MQMessageExt& message, std::string receipt_handle);
diff --git a/src/main/cpp/base/include/MessageImpl.h b/src/main/cpp/base/include/MessageImpl.h
index 36d3d74..8572a07 100644
--- a/src/main/cpp/base/include/MessageImpl.h
+++ b/src/main/cpp/base/include/MessageImpl.h
@@ -1,5 +1,7 @@
#pragma once
+#include <map>
+
#include "Protocol.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/base/include/MetadataConstants.h b/src/main/cpp/base/include/MetadataConstants.h
index bb5c2c9..a9dde76 100644
--- a/src/main/cpp/base/include/MetadataConstants.h
+++ b/src/main/cpp/base/include/MetadataConstants.h
@@ -1,4 +1,5 @@
#pragma once
+
#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 11aac59..0eb5e72 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -1,13 +1,14 @@
#pragma once
-#include "absl/strings/string_view.h"
-#include "re2/re2.h"
-#include "rocketmq/MQMessage.h"
-
#include <chrono>
#include <cstdint>
#include <string>
+#include "absl/strings/string_view.h"
+#include "re2/re2.h"
+
+#include "rocketmq/MQMessage.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class MixAll {
diff --git a/src/main/cpp/base/include/RateLimiter.h b/src/main/cpp/base/include/RateLimiter.h
index 8059398..060bbd0 100644
--- a/src/main/cpp/base/include/RateLimiter.h
+++ b/src/main/cpp/base/include/RateLimiter.h
@@ -1,6 +1,5 @@
#pragma once
-#include "rocketmq/RocketMQ.h"
#include <array>
#include <atomic>
#include <condition_variable>
@@ -10,6 +9,8 @@
#include <thread>
#include <vector>
+#include "rocketmq/RocketMQ.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class Tick {
diff --git a/src/main/cpp/base/include/ThreadPool.h b/src/main/cpp/base/include/ThreadPool.h
index 4e6044e..967ad26 100644
--- a/src/main/cpp/base/include/ThreadPool.h
+++ b/src/main/cpp/base/include/ThreadPool.h
@@ -1,8 +1,9 @@
#pragma once
-#include "rocketmq/RocketMQ.h"
#include <functional>
+#include "rocketmq/RocketMQ.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class ThreadPool {
diff --git a/src/main/cpp/base/include/ThreadPoolImpl.h b/src/main/cpp/base/include/ThreadPoolImpl.h
index 4309006..fad9204 100644
--- a/src/main/cpp/base/include/ThreadPoolImpl.h
+++ b/src/main/cpp/base/include/ThreadPoolImpl.h
@@ -1,10 +1,5 @@
#pragma once
-#include "ThreadPool.h"
-#include "absl/synchronization/mutex.h"
-#include "asio/io_context.hpp"
-#include "rocketmq/RocketMQ.h"
-#include "rocketmq/State.h"
#include <atomic>
#include <cstdint>
#include <functional>
@@ -12,13 +7,18 @@
#include <thread>
#include <vector>
+#include "absl/synchronization/mutex.h"
#include "asio.hpp"
+#include "asio/io_context.hpp"
+
+#include "ThreadPool.h"
+#include "rocketmq/State.h"
ROCKETMQ_NAMESPACE_BEGIN
class ThreadPoolImpl : public ThreadPool {
public:
- ThreadPoolImpl(std::uint16_t workers);
+ explicit ThreadPoolImpl(std::uint16_t workers);
~ThreadPoolImpl() override = default;
diff --git a/src/main/cpp/base/include/UniqueIdGenerator.h b/src/main/cpp/base/include/UniqueIdGenerator.h
index e72cb7e..ec6cf50 100644
--- a/src/main/cpp/base/include/UniqueIdGenerator.h
+++ b/src/main/cpp/base/include/UniqueIdGenerator.h
@@ -1,13 +1,14 @@
#pragma once
-#include "rocketmq/RocketMQ.h"
-#include "absl/base/thread_annotations.h"
-#include "absl/synchronization/mutex.h"
-
#include <chrono>
#include <cstdint>
#include <string>
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
+
+#include "rocketmq/RocketMQ.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class UniqueIdGenerator {
diff --git a/src/main/cpp/client/ClientConfigImpl.cpp b/src/main/cpp/client/ClientConfigImpl.cpp
index 650305e..ff71156 100644
--- a/src/main/cpp/client/ClientConfigImpl.cpp
+++ b/src/main/cpp/client/ClientConfigImpl.cpp
@@ -1,11 +1,13 @@
#include "ClientConfigImpl.h"
-#include "UtilAll.h"
+
#include <sstream>
#ifndef _WIN32
#include <unistd.h>
#endif
+#include "UtilAll.h"
+
ROCKETMQ_NAMESPACE_BEGIN
#ifndef CLIENT_VERSION_MAJOR
@@ -22,10 +24,9 @@
const char* ClientConfigImpl::CLIENT_VERSION = CLIENT_VERSION_MAJOR "." CLIENT_VERSION_MINOR "." CLIENT_VERSION_PATCH;
-ClientConfigImpl::ClientConfigImpl() : ClientConfigImpl(std::string()) {}
-
-ClientConfigImpl::ClientConfigImpl(std::string group_name)
- : group_name_(std::move(group_name)), io_timeout_(absl::Seconds(3)), long_polling_timeout_(absl::Seconds(30)) {
+ClientConfigImpl::ClientConfigImpl(absl::string_view group_name)
+ : group_name_(group_name.data(), group_name.length()), io_timeout_(absl::Seconds(3)),
+ long_polling_timeout_(absl::Seconds(30)) {
instance_name_ = "DEFAULT";
}
diff --git a/src/main/cpp/client/include/Assignment.h b/src/main/cpp/client/include/Assignment.h
index 17cfdb3..220c6ad 100644
--- a/src/main/cpp/client/include/Assignment.h
+++ b/src/main/cpp/client/include/Assignment.h
@@ -1,18 +1,18 @@
#pragma once
-#include "ConsumeMessageType.h"
-#include "rocketmq/MQMessageQueue.h"
#include <map>
#include <string>
+#include "ConsumeMessageType.h"
+#include "rocketmq/MQMessageQueue.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class Assignment {
public:
- Assignment(MQMessageQueue message_queue)
- : message_queue_(std::move(message_queue)) {}
+ Assignment(MQMessageQueue message_queue) : message_queue_(std::move(message_queue)) {}
- bool operator==(const Assignment &rhs) const {
+ bool operator==(const Assignment& rhs) const {
if (this == &rhs) {
return true;
}
@@ -20,11 +20,9 @@
return message_queue_ == rhs.message_queue_;
}
- bool operator<(const Assignment &rhs) const {
- return message_queue_ < rhs.message_queue_;
- }
+ bool operator<(const Assignment& rhs) const { return message_queue_ < rhs.message_queue_; }
- const MQMessageQueue &messageQueue() const { return message_queue_; }
+ const MQMessageQueue& messageQueue() const { return message_queue_; }
private:
MQMessageQueue message_queue_;
diff --git a/src/main/cpp/client/include/Broker.h b/src/main/cpp/client/include/Broker.h
index 5c40c48..1fc17e1 100644
--- a/src/main/cpp/client/include/Broker.h
+++ b/src/main/cpp/client/include/Broker.h
@@ -1,9 +1,10 @@
#pragma once
-#include "ServiceAddress.h"
#include <memory>
#include <vector>
+#include "ServiceAddress.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class Broker {
diff --git a/src/main/cpp/client/include/Client.h b/src/main/cpp/client/include/Client.h
index 847dac9..d029598 100644
--- a/src/main/cpp/client/include/Client.h
+++ b/src/main/cpp/client/include/Client.h
@@ -1,9 +1,11 @@
#pragma once
-#include "ClientConfig.h"
-#include "absl/container/flat_hash_set.h"
#include <functional>
+#include "absl/container/flat_hash_set.h"
+
+#include "ClientConfig.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class Client : virtual public ClientConfig {
diff --git a/src/main/cpp/client/include/ClientConfig.h b/src/main/cpp/client/include/ClientConfig.h
index b5ed3d1..ec3f529 100644
--- a/src/main/cpp/client/include/ClientConfig.h
+++ b/src/main/cpp/client/include/ClientConfig.h
@@ -1,10 +1,11 @@
#pragma once
-#include "rocketmq/CredentialsProvider.h"
-#include "absl/time/time.h"
-#include "rocketmq/RocketMQ.h"
#include <string>
+#include "absl/time/time.h"
+
+#include "rocketmq/CredentialsProvider.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class ClientConfig {
diff --git a/src/main/cpp/client/include/ClientConfigImpl.h b/src/main/cpp/client/include/ClientConfigImpl.h
index f106d58..d45392d 100644
--- a/src/main/cpp/client/include/ClientConfigImpl.h
+++ b/src/main/cpp/client/include/ClientConfigImpl.h
@@ -1,49 +1,55 @@
#pragma once
-#include "ClientConfig.h"
-#include "absl/strings/string_view.h"
-#include "absl/time/time.h"
-#include "rocketmq/RocketMQ.h"
#include <atomic>
#include <chrono>
#include <memory>
#include <string>
#include <vector>
+#include "absl/strings/string_view.h"
+#include "absl/time/time.h"
+
+#include "ClientConfig.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class ClientConfigImpl : virtual public ClientConfig {
public:
- ClientConfigImpl();
-
- explicit ClientConfigImpl(std::string group_name);
+ explicit ClientConfigImpl(absl::string_view group_name);
~ClientConfigImpl() override = default;
- const std::string& resourceNamespace() const override { return resource_namespace_; }
+ const std::string &resourceNamespace() const override {
+ return resource_namespace_;
+ }
void resourceNamespace(absl::string_view resource_namespace) {
- resource_namespace_ = std::string(resource_namespace.data(), resource_namespace.length());
+ resource_namespace_ =
+ std::string(resource_namespace.data(), resource_namespace.length());
}
std::string clientId() const override;
- const std::string& getInstanceName() const;
+ const std::string &getInstanceName() const;
void setInstanceName(std::string instance_name);
- const std::string& getGroupName() const override;
+ const std::string &getGroupName() const override;
void setGroupName(std::string group_name);
- const std::string& getUnitName() const { return unit_name_; }
+ const std::string &getUnitName() const { return unit_name_; }
void setUnitName(std::string unit_name) { unit_name_ = std::move(unit_name); }
absl::Duration getIoTimeout() const override;
void setIoTimeout(absl::Duration timeout);
- absl::Duration getLongPollingTimeout() const override { return long_polling_timeout_; }
+ absl::Duration getLongPollingTimeout() const override {
+ return long_polling_timeout_;
+ }
- void setLongPollingTimeout(absl::Duration timeout) { long_polling_timeout_ = timeout; }
+ void setLongPollingTimeout(absl::Duration timeout) {
+ long_polling_timeout_ = timeout;
+ }
bool isTracingEnabled() { return enable_tracing_.load(); }
void enableTracing(bool enabled) { enable_tracing_.store(enabled); }
@@ -51,16 +57,18 @@
CredentialsProviderPtr credentialsProvider() override;
void setCredentialsProvider(CredentialsProviderPtr credentials_provider);
- void serviceName(std::string service_name) { service_name_ = std::move(service_name); }
- const std::string& serviceName() const override { return service_name_; }
+ void serviceName(std::string service_name) {
+ service_name_ = std::move(service_name);
+ }
+ const std::string &serviceName() const override { return service_name_; }
void region(std::string region) { region_ = std::move(region); }
- const std::string& region() const override { return region_; }
+ const std::string ®ion() const override { return region_; }
void tenantId(std::string tenant_id) { tenant_id_ = std::move(tenant_id); }
- const std::string& tenantId() const override { return tenant_id_; }
+ const std::string &tenantId() const override { return tenant_id_; }
- static const char* CLIENT_VERSION;
+ static const char *CLIENT_VERSION;
protected:
/**
@@ -74,7 +82,8 @@
std::string region_;
/**
- * RocketMQ instance namespace, in which topic, consumer group and any other abstract resources remain unique.
+ * RocketMQ instance namespace, in which topic, consumer group and any other
+ * abstract resources remain unique.
*/
std::string resource_namespace_;
diff --git a/src/main/cpp/client/include/ClientManager.h b/src/main/cpp/client/include/ClientManager.h
index 8ebb2eb..b98a6d3 100644
--- a/src/main/cpp/client/include/ClientManager.h
+++ b/src/main/cpp/client/include/ClientManager.h
@@ -1,5 +1,8 @@
#pragma once
+#include <chrono>
+#include <memory>
+
#include "Client.h"
#include "ReceiveMessageCallback.h"
#include "RpcClient.h"
@@ -7,8 +10,6 @@
#include "TopAddressing.h"
#include "TopicRouteData.h"
#include "rocketmq/MQMessageExt.h"
-#include <chrono>
-#include <memory>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/ClientManagerFactory.h b/src/main/cpp/client/include/ClientManagerFactory.h
index 98405f3..d1baf85 100644
--- a/src/main/cpp/client/include/ClientManagerFactory.h
+++ b/src/main/cpp/client/include/ClientManagerFactory.h
@@ -3,11 +3,12 @@
#include <string>
#include <thread>
-#include "ClientConfig.h"
-#include "ClientManager.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/mutex.h"
+
+#include "ClientConfig.h"
+#include "ClientManager.h"
#include "rocketmq/AdminServer.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/ClientManagerImpl.h b/src/main/cpp/client/include/ClientManagerImpl.h
index 050ad91..ba56c47 100644
--- a/src/main/cpp/client/include/ClientManagerImpl.h
+++ b/src/main/cpp/client/include/ClientManagerImpl.h
@@ -1,5 +1,19 @@
#pragma once
+#include <atomic>
+#include <chrono>
+#include <cstdint>
+#include <functional>
+#include <future>
+#include <string>
+#include <vector>
+
+#include "absl/base/thread_annotations.h"
+#include "absl/container/flat_hash_map.h"
+#include "absl/container/flat_hash_set.h"
+#include "absl/strings/string_view.h"
+#include "absl/synchronization/mutex.h"
+
#include "Client.h"
#include "ClientManager.h"
#include "HeartbeatDataCallback.h"
@@ -15,21 +29,8 @@
#include "TopAddressing.h"
#include "TopicRouteChangeCallback.h"
#include "TopicRouteData.h"
-#include "absl/base/thread_annotations.h"
-#include "absl/container/flat_hash_map.h"
-#include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
-#include "absl/synchronization/mutex.h"
#include "rocketmq/AsyncCallback.h"
-#include "rocketmq/CommunicationMode.h"
#include "rocketmq/State.h"
-#include <atomic>
-#include <chrono>
-#include <cstdint>
-#include <functional>
-#include <future>
-#include <string>
-#include <vector>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/ConsumeMessageType.h b/src/main/cpp/client/include/ConsumeMessageType.h
index 3f649ad..e57b5b3 100644
--- a/src/main/cpp/client/include/ConsumeMessageType.h
+++ b/src/main/cpp/client/include/ConsumeMessageType.h
@@ -1,9 +1,9 @@
#pragma once
-#include "rocketmq/RocketMQ.h"
-
#include <cstdint>
+#include "rocketmq/RocketMQ.h"
+
ROCKETMQ_NAMESPACE_BEGIN
enum class ConsumeMessageType : int8_t {
diff --git a/src/main/cpp/client/include/HeartbeatDataCallback.h b/src/main/cpp/client/include/HeartbeatDataCallback.h
index 9be195d..f105c70 100644
--- a/src/main/cpp/client/include/HeartbeatDataCallback.h
+++ b/src/main/cpp/client/include/HeartbeatDataCallback.h
@@ -1,7 +1,6 @@
#pragma once
#include "RpcClient.h"
-#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/InterceptorContinuation.h b/src/main/cpp/client/include/InterceptorContinuation.h
index 3990386..05a73d2 100644
--- a/src/main/cpp/client/include/InterceptorContinuation.h
+++ b/src/main/cpp/client/include/InterceptorContinuation.h
@@ -1,6 +1,7 @@
#pragma once
#include "grpcpp/impl/codegen/interceptor.h"
+
#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/LogInterceptor.h b/src/main/cpp/client/include/LogInterceptor.h
index f270a64..a14bbf0 100644
--- a/src/main/cpp/client/include/LogInterceptor.h
+++ b/src/main/cpp/client/include/LogInterceptor.h
@@ -1,4 +1,5 @@
#include "grpcpp/impl/codegen/client_interceptor.h"
+
#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/LogInterceptorFactory.h b/src/main/cpp/client/include/LogInterceptorFactory.h
index c356090..85c0719 100644
--- a/src/main/cpp/client/include/LogInterceptorFactory.h
+++ b/src/main/cpp/client/include/LogInterceptorFactory.h
@@ -1,6 +1,7 @@
#pragma once
#include "grpcpp/impl/codegen/client_interceptor.h"
+
#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/OrphanTransactionCallback.h b/src/main/cpp/client/include/OrphanTransactionCallback.h
index 356fd64..b320650 100644
--- a/src/main/cpp/client/include/OrphanTransactionCallback.h
+++ b/src/main/cpp/client/include/OrphanTransactionCallback.h
@@ -1,7 +1,6 @@
#pragma once
#include "rocketmq/MQMessage.h"
-#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/ReceiveMessageResult.h b/src/main/cpp/client/include/ReceiveMessageResult.h
index 026f5fd..badf8e1 100644
--- a/src/main/cpp/client/include/ReceiveMessageResult.h
+++ b/src/main/cpp/client/include/ReceiveMessageResult.h
@@ -1,13 +1,13 @@
#pragma once
-#include "MixAll.h"
-#include "absl/time/time.h"
-#include "rocketmq/MQMessageExt.h"
-
#include <cstdlib>
#include <sstream>
#include <utility>
+#include "MixAll.h"
+#include "absl/time/time.h"
+#include "rocketmq/MQMessageExt.h"
+
ROCKETMQ_NAMESPACE_BEGIN
enum class ReceiveMessageStatus : int32_t {
diff --git a/src/main/cpp/client/include/RpcClient.h b/src/main/cpp/client/include/RpcClient.h
index e9b9d71..416001e 100644
--- a/src/main/cpp/client/include/RpcClient.h
+++ b/src/main/cpp/client/include/RpcClient.h
@@ -1,19 +1,19 @@
#pragma once
-#include "apache/rocketmq/v1/service.pb.h"
-#include "grpcpp/grpcpp.h"
-
-#include "InvocationContext.h"
-#include "OrphanTransactionCallback.h"
+#include <chrono>
+#include <iostream>
+#include <memory>
+#include <string>
#include "absl/container/flat_hash_map.h"
#include "absl/strings/string_view.h"
#include "apache/rocketmq/v1/definition.grpc.pb.h"
#include "apache/rocketmq/v1/service.grpc.pb.h"
-#include <chrono>
-#include <iostream>
-#include <memory>
-#include <string>
+#include "apache/rocketmq/v1/service.pb.h"
+#include "grpcpp/grpcpp.h"
+
+#include "InvocationContext.h"
+#include "OrphanTransactionCallback.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/RpcClientImpl.h b/src/main/cpp/client/include/RpcClientImpl.h
index cc0439d..fbdbe40 100644
--- a/src/main/cpp/client/include/RpcClientImpl.h
+++ b/src/main/cpp/client/include/RpcClientImpl.h
@@ -1,9 +1,10 @@
#pragma once
-#include "RpcClient.h"
+#include <memory>
#include "absl/container/flat_hash_map.h"
-#include <memory>
+
+#include "RpcClient.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/SendMessageContext.h b/src/main/cpp/client/include/SendMessageContext.h
index f3a19dc..a54bb8e 100644
--- a/src/main/cpp/client/include/SendMessageContext.h
+++ b/src/main/cpp/client/include/SendMessageContext.h
@@ -1,8 +1,9 @@
#pragma once
+
+#include <string>
+
#include "rocketmq/MQMessage.h"
#include "rocketmq/MQMessageQueue.h"
-#include "rocketmq/RocketMQ.h"
-#include <string>
ROCKETMQ_NAMESPACE_BEGIN
class SendMessageContext {
diff --git a/src/main/cpp/client/include/ServiceAddress.h b/src/main/cpp/client/include/ServiceAddress.h
index 7c28d67..067e734 100644
--- a/src/main/cpp/client/include/ServiceAddress.h
+++ b/src/main/cpp/client/include/ServiceAddress.h
@@ -1,11 +1,13 @@
#pragma once
-#include "absl/strings/string_view.h"
-#include "rocketmq/RocketMQ.h"
#include <cstdint>
#include <string>
#include <vector>
+#include "absl/strings/string_view.h"
+
+#include "rocketmq/RocketMQ.h"
+
ROCKETMQ_NAMESPACE_BEGIN
enum AddressScheme : int8_t {
diff --git a/src/main/cpp/client/include/Signature.h b/src/main/cpp/client/include/Signature.h
index f617ecc..7d9c0c4 100644
--- a/src/main/cpp/client/include/Signature.h
+++ b/src/main/cpp/client/include/Signature.h
@@ -1,6 +1,9 @@
-#include "ClientConfig.h"
+#pragma once
+
#include "absl/container/flat_hash_map.h"
+#include "ClientConfig.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class Signature {
diff --git a/src/main/cpp/client/include/Topic.h b/src/main/cpp/client/include/Topic.h
index 8778d92..360f794 100644
--- a/src/main/cpp/client/include/Topic.h
+++ b/src/main/cpp/client/include/Topic.h
@@ -1,9 +1,10 @@
#pragma once
-#include "rocketmq/RocketMQ.h"
#include <string>
#include <unordered_map>
+#include "rocketmq/RocketMQ.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class Topic {
diff --git a/src/main/cpp/client/include/TopicAssignmentInfo.h b/src/main/cpp/client/include/TopicAssignmentInfo.h
index 9581afb..41ec336 100644
--- a/src/main/cpp/client/include/TopicAssignmentInfo.h
+++ b/src/main/cpp/client/include/TopicAssignmentInfo.h
@@ -1,10 +1,11 @@
#pragma once
-#include "Assignment.h"
-#include "RpcClient.h"
#include <atomic>
#include <vector>
+#include "Assignment.h"
+#include "RpcClient.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class TopicAssignment {
diff --git a/src/main/cpp/client/include/TopicPublishInfo.h b/src/main/cpp/client/include/TopicPublishInfo.h
index d07e495..d4b18dd 100644
--- a/src/main/cpp/client/include/TopicPublishInfo.h
+++ b/src/main/cpp/client/include/TopicPublishInfo.h
@@ -2,11 +2,12 @@
#include <vector>
-#include "TopicRouteData.h"
-#include "absl/container/flat_hash_set.h"
#include "absl/base/thread_annotations.h"
+#include "absl/container/flat_hash_set.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
+
+#include "TopicRouteData.h"
#include "rocketmq/MQMessageQueue.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/client/include/TopicRouteChangeCallback.h b/src/main/cpp/client/include/TopicRouteChangeCallback.h
index 1415586..0a8829e 100644
--- a/src/main/cpp/client/include/TopicRouteChangeCallback.h
+++ b/src/main/cpp/client/include/TopicRouteChangeCallback.h
@@ -1,12 +1,11 @@
#pragma once
-#include "TopicRouteData.h"
#include "absl/strings/string_view.h"
+#include "TopicRouteData.h"
+
ROCKETMQ_NAMESPACE_BEGIN
-
-
class TopicRouteChangeCallback {
public:
virtual void onTopicRouteChange(absl::string_view topic, const TopicRouteDataPtr& topic_route_data) = 0;
diff --git a/src/main/cpp/client/include/TopicRouteData.h b/src/main/cpp/client/include/TopicRouteData.h
index 4719ef4..2de0c83 100644
--- a/src/main/cpp/client/include/TopicRouteData.h
+++ b/src/main/cpp/client/include/TopicRouteData.h
@@ -1,10 +1,11 @@
#pragma once
-#include "Partition.h"
-#include "RpcClient.h"
#include <algorithm>
#include <vector>
+#include "Partition.h"
+#include "RpcClient.h"
+
ROCKETMQ_NAMESPACE_BEGIN
namespace rmq = apache::rocketmq::v1;
diff --git a/src/main/cpp/client/include/TracingUtility.h b/src/main/cpp/client/include/TracingUtility.h
deleted file mode 100644
index c69a98f..0000000
--- a/src/main/cpp/client/include/TracingUtility.h
+++ /dev/null
@@ -1,59 +0,0 @@
-#pragma once
-#ifdef ENABLE_TRACING
-#include "opentelemetry/trace/span_context.h"
-#include "rocketmq/RocketMQ.h"
-#include <iostream>
-#include <string>
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-namespace trace = opentelemetry::trace;
-
-class TracingUtility {
-public:
- static TracingUtility& get();
-
- static const int kTraceDelimiterBytes = 3;
- // 0: version, 1: trace id, 2: span id, 3: trace flags
- constexpr static const int kHeaderElementLengths[4] = {2, 32, 16, 2};
- static const int kHeaderSize = kHeaderElementLengths[0] + kHeaderElementLengths[1] + kHeaderElementLengths[2] +
- kHeaderElementLengths[3] + kTraceDelimiterBytes;
-
- static const int kTraceStateMaxMembers = 32;
- static const int kVersionBytes = 2;
- static const int kTraceIdBytes = 32;
- static const int kSpanIdBytes = 16;
- static const int kTraceFlagBytes = 2;
-
- static const std::string INVALID_TRACE_ID;
- static const std::string INVALID_SPAN_ID;
-
- const std::string topic_ = "topic";
- const std::string consumer_group_ = "consumer_group";
- const std::string msg_id_ = "msg_id";
- const std::string tags_ = "tags";
- const std::string store_host_ = "store_host";
- const std::string success_ = "success";
- const std::string max_attempt_times_ = "retry_time";
- const std::string expired_ = "expired";
-
- static std::string injectSpanContextToTraceParent(const trace::SpanContext& span_context);
-
- static trace::SpanContext extractContextFromTraceParent(const std::string& trace_parent);
-
-private:
- static void generateHexFromString(const std::string& string, int bytes, uint8_t* buf);
-
- static uint8_t hexToInt(char c);
-
- static trace::TraceId generateTraceIdFromString(const std::string& trace_id);
-
- static bool isValidHex(const std::string& str);
-
- static trace::SpanId generateSpanIdFromString(const std::string& span_id);
-
- static trace::TraceFlags generateTraceFlagsFromString(std::string trace_flags);
-};
-
-ROCKETMQ_NAMESPACE_END
-#endif
\ No newline at end of file
diff --git a/src/main/cpp/concurrent/include/CountdownLatch.h b/src/main/cpp/concurrent/include/CountdownLatch.h
index d873375..6ef2183 100644
--- a/src/main/cpp/concurrent/include/CountdownLatch.h
+++ b/src/main/cpp/concurrent/include/CountdownLatch.h
@@ -1,10 +1,12 @@
#pragma once
+#include <string>
+
#include "absl/base/thread_annotations.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
+
#include "rocketmq/RocketMQ.h"
-#include <string>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index d029f83..df1b5ae 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -22,7 +22,7 @@
ROCKETMQ_NAMESPACE_BEGIN
-ClientImpl::ClientImpl(std::string group_name) : ClientConfigImpl(std::move(group_name)), state_(State::CREATED) {}
+ClientImpl::ClientImpl(absl::string_view group_name) : ClientConfigImpl(group_name), state_(State::CREATED) {}
void ClientImpl::start() {
State expected = CREATED;
diff --git a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
index c08eac8..d74274f 100644
--- a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
@@ -1,3 +1,14 @@
+#include <limits>
+#include <string>
+#include <utility>
+
+#include "absl/memory/memory.h"
+#include "absl/strings/str_join.h"
+#include "absl/time/time.h"
+#include "absl/types/span.h"
+#include "opencensus/trace/propagation/trace_context.h"
+#include "opencensus/trace/span.h"
+
#include "ConsumeMessageService.h"
#include "LoggerImpl.h"
#include "MessageAccessor.h"
@@ -5,20 +16,10 @@
#include "OtlpExporter.h"
#include "Protocol.h"
#include "PushConsumer.h"
-#include "TracingUtility.h"
#include "UtilAll.h"
-#include "absl/memory/memory.h"
-#include "absl/strings/str_join.h"
-#include "absl/time/time.h"
-#include "absl/types/span.h"
-#include "opencensus/trace/propagation/trace_context.h"
-#include "opencensus/trace/span.h"
#include "rocketmq/ConsumeType.h"
#include "rocketmq/MQMessage.h"
#include "rocketmq/MessageListener.h"
-#include <limits>
-#include <string>
-#include <utility>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/rocketmq/DefaultMQPullConsumer.cpp b/src/main/cpp/rocketmq/DefaultMQPullConsumer.cpp
index fd6f244..f46d02a 100644
--- a/src/main/cpp/rocketmq/DefaultMQPullConsumer.cpp
+++ b/src/main/cpp/rocketmq/DefaultMQPullConsumer.cpp
@@ -1,12 +1,13 @@
#include "rocketmq/DefaultMQPullConsumer.h"
+#include <memory>
+
#include "absl/strings/str_split.h"
#include "AwaitPullCallback.h"
#include "DynamicNameServerResolver.h"
#include "PullConsumerImpl.h"
#include "StaticNameServerResolver.h"
-#include <memory>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp b/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
index ab072fd..a5148a3 100644
--- a/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
+++ b/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
@@ -1,8 +1,5 @@
#include <chrono>
#include <memory>
-#include <set>
-
-#include "absl/strings/str_split.h"
#include "DynamicNameServerResolver.h"
#include "PushConsumerImpl.h"
@@ -11,24 +8,15 @@
ROCKETMQ_NAMESPACE_BEGIN
-static std::set<std::string> consumerTable{};
-DefaultMQPushConsumer::DefaultMQPushConsumer(const std::string& group_name) : group_name_(group_name) {
- if (consumerTable.count(group_name)) {
- SPDLOG_ERROR("create consumer with same group name in a process, group name : {}", group_name);
- std::string err_msg = "create consumer with same group name in a process, group name :" + group_name;
- THROW_MQ_EXCEPTION(MQClientException, err_msg, -1);
- } else {
- impl_ = std::make_shared<PushConsumerImpl>(group_name);
- consumerTable.insert(group_name);
- }
+DefaultMQPushConsumer::DefaultMQPushConsumer(const std::string& group_name) {
+ impl_ = std::make_shared<PushConsumerImpl>(group_name);
}
void DefaultMQPushConsumer::start() { impl_->start(); }
void DefaultMQPushConsumer::shutdown() {
impl_->shutdown();
- consumerTable.erase(group_name_);
- SPDLOG_DEBUG("DefaultMQPushConsumerImpl shared_ptr use_count : {}", impl_.use_count());
+ SPDLOG_DEBUG("PushConsumerImpl shared_ptr use_count={}", impl_.use_count());
}
void DefaultMQPushConsumer::subscribe(const std::string& topic, const std::string& expression,
@@ -92,4 +80,6 @@
void DefaultMQPushConsumer::setMessageModel(MessageModel message_model) { impl_->setMessageModel(message_model); }
+std::string DefaultMQPushConsumer::groupName() const { return impl_->getGroupName(); }
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index fb0a6d5..9f174fd 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -1,5 +1,11 @@
#include "ProducerImpl.h"
+#include <atomic>
+
+#include "absl/strings/str_join.h"
+#include "opencensus/trace/propagation/trace_context.h"
+#include "opencensus/trace/span.h"
+
#include "MessageAccessor.h"
#include "MessageGroupQueueSelector.h"
#include "MetadataConstants.h"
@@ -13,20 +19,16 @@
#include "TransactionImpl.h"
#include "UniqueIdGenerator.h"
#include "UtilAll.h"
-#include "absl/strings/str_join.h"
-#include "opencensus/trace/propagation/trace_context.h"
-#include "opencensus/trace/span.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/MQClientException.h"
#include "rocketmq/MQMessage.h"
#include "rocketmq/MQMessageQueue.h"
#include "rocketmq/Transaction.h"
-#include <atomic>
ROCKETMQ_NAMESPACE_BEGIN
-ProducerImpl::ProducerImpl(std::string group_name)
- : ClientImpl(std::move(group_name)), compress_body_threshold_(MixAll::DEFAULT_COMPRESS_BODY_THRESHOLD_) {
+ProducerImpl::ProducerImpl(absl::string_view group_name)
+ : ClientImpl(group_name), compress_body_threshold_(MixAll::DEFAULT_COMPRESS_BODY_THRESHOLD_) {
// TODO: initialize client_config_ and fault_strategy_
}
diff --git a/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 09fdb35..99550b1 100644
--- a/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -16,7 +16,7 @@
ROCKETMQ_NAMESPACE_BEGIN
-PushConsumerImpl::PushConsumerImpl(std::string group_name) : ClientImpl(std::move(group_name)) {}
+PushConsumerImpl::PushConsumerImpl(absl::string_view group_name) : ClientImpl(group_name) {}
PushConsumerImpl::~PushConsumerImpl() { SPDLOG_DEBUG("DefaultMQPushConsumerImpl is destructed"); }
diff --git a/src/main/cpp/rocketmq/include/AwaitPullCallback.h b/src/main/cpp/rocketmq/include/AwaitPullCallback.h
index 718be51..95d9572 100644
--- a/src/main/cpp/rocketmq/include/AwaitPullCallback.h
+++ b/src/main/cpp/rocketmq/include/AwaitPullCallback.h
@@ -1,4 +1,5 @@
#include "absl/synchronization/mutex.h"
+
#include "rocketmq/AsyncCallback.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/rocketmq/include/ClientImpl.h b/src/main/cpp/rocketmq/include/ClientImpl.h
index a3046cf..40a9756 100644
--- a/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -3,6 +3,7 @@
#include <chrono>
#include <cstdint>
+#include "absl/strings/string_view.h"
#include "apache/rocketmq/v1/definition.pb.h"
#include "Client.h"
@@ -17,9 +18,9 @@
ROCKETMQ_NAMESPACE_BEGIN
-class ClientImpl : virtual public Client, virtual public ClientConfigImpl {
+class ClientImpl : public ClientConfigImpl, virtual public Client {
public:
- explicit ClientImpl(std::string group_name);
+ explicit ClientImpl(absl::string_view group_name);
~ClientImpl() override = default;
diff --git a/src/main/cpp/rocketmq/include/Consumer.h b/src/main/cpp/rocketmq/include/Consumer.h
index 860a342..20751b8 100644
--- a/src/main/cpp/rocketmq/include/Consumer.h
+++ b/src/main/cpp/rocketmq/include/Consumer.h
@@ -1,10 +1,11 @@
#pragma once
+#include "absl/container/flat_hash_map.h"
+#include "absl/types/optional.h"
+
#include "Client.h"
#include "FilterExpression.h"
#include "ReceiveMessageAction.h"
-#include "absl/container/flat_hash_map.h"
-#include "absl/types/optional.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/rocketmq/include/FilterExpression.h b/src/main/cpp/rocketmq/include/FilterExpression.h
index c71fe9e..3c4e3cc 100644
--- a/src/main/cpp/rocketmq/include/FilterExpression.h
+++ b/src/main/cpp/rocketmq/include/FilterExpression.h
@@ -1,7 +1,9 @@
#pragma once
+
+#include <string>
+
#include "rocketmq/MQMessageExt.h"
#include "rocketmq/ExpressionType.h"
-#include <string>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/rocketmq/include/ProcessQueue.h b/src/main/cpp/rocketmq/include/ProcessQueue.h
index 61ee74d..a73b67e 100644
--- a/src/main/cpp/rocketmq/include/ProcessQueue.h
+++ b/src/main/cpp/rocketmq/include/ProcessQueue.h
@@ -1,10 +1,11 @@
#pragma once
+#include <memory>
+
#include "ConsumeMessageType.h"
#include "FilterExpression.h"
#include "ReceiveMessageCallback.h"
#include "rocketmq/MQMessageExt.h"
-#include <memory>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/rocketmq/include/ProducerImpl.h b/src/main/cpp/rocketmq/include/ProducerImpl.h
index 043fdc4..43459c2 100644
--- a/src/main/cpp/rocketmq/include/ProducerImpl.h
+++ b/src/main/cpp/rocketmq/include/ProducerImpl.h
@@ -5,6 +5,8 @@
#include <mutex>
#include <string>
+#include "absl/strings/string_view.h"
+
#include "ClientImpl.h"
#include "ClientManagerImpl.h"
#include "MixAll.h"
@@ -23,7 +25,7 @@
class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_this<ProducerImpl> {
public:
- explicit ProducerImpl(std::string group_name);
+ explicit ProducerImpl(absl::string_view group_name);
~ProducerImpl() override;
@@ -52,9 +54,11 @@
std::unique_ptr<TransactionImpl> prepare(MQMessage& message);
- bool commit(const std::string& message_id, const std::string& transaction_id, const std::string& trace_context, const std::string& target);
+ bool commit(const std::string& message_id, const std::string& transaction_id, const std::string& trace_context,
+ const std::string& target);
- bool rollback(const std::string& message_id, const std::string& transaction_id, const std::string& trace_context, const std::string& target);
+ bool rollback(const std::string& message_id, const std::string& transaction_id, const std::string& trace_context,
+ const std::string& target);
/**
* Check if the RPC client for the target host is isolated or not
diff --git a/src/main/cpp/rocketmq/include/PullConsumerImpl.h b/src/main/cpp/rocketmq/include/PullConsumerImpl.h
index 2b8cc53..6145343 100644
--- a/src/main/cpp/rocketmq/include/PullConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PullConsumerImpl.h
@@ -3,6 +3,8 @@
#include <future>
#include <memory>
+#include "absl/strings/string_view.h"
+
#include "ClientConfig.h"
#include "ClientImpl.h"
#include "ClientManagerImpl.h"
@@ -14,7 +16,7 @@
class PullConsumerImpl : public ClientImpl, public std::enable_shared_from_this<PullConsumerImpl> {
public:
- explicit PullConsumerImpl(std::string group_name) : ClientImpl(std::move(group_name)) {}
+ explicit PullConsumerImpl(absl::string_view group_name) : ClientImpl(group_name) {}
void start() override;
diff --git a/src/main/cpp/rocketmq/include/PushConsumer.h b/src/main/cpp/rocketmq/include/PushConsumer.h
index 183c0bd..c71bd1e 100644
--- a/src/main/cpp/rocketmq/include/PushConsumer.h
+++ b/src/main/cpp/rocketmq/include/PushConsumer.h
@@ -1,13 +1,14 @@
#pragma once
+#include <functional>
+#include <memory>
+
#include "ConsumeMessageService.h"
#include "Consumer.h"
#include "ProcessQueue.h"
#include "rocketmq/Executor.h"
#include "rocketmq/MessageListener.h"
#include "rocketmq/MessageModel.h"
-#include <functional>
-#include <memory>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/rocketmq/include/PushConsumerImpl.h b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index 64d2f5f..c03653b 100644
--- a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -4,6 +4,8 @@
#include <mutex>
#include <string>
+#include "absl/strings/string_view.h"
+
#include "ClientConfigImpl.h"
#include "ClientImpl.h"
#include "ClientManagerImpl.h"
@@ -30,7 +32,7 @@
virtual public PushConsumer,
public std::enable_shared_from_this<PushConsumerImpl> {
public:
- explicit PushConsumerImpl(std::string group_name);
+ explicit PushConsumerImpl(absl::string_view group_name);
~PushConsumerImpl() override;
diff --git a/src/main/cpp/rocketmq/include/ReceiveMessageAction.h b/src/main/cpp/rocketmq/include/ReceiveMessageAction.h
index 7805e61..6dea6a9 100644
--- a/src/main/cpp/rocketmq/include/ReceiveMessageAction.h
+++ b/src/main/cpp/rocketmq/include/ReceiveMessageAction.h
@@ -1,9 +1,9 @@
#pragma once
-#include "rocketmq/RocketMQ.h"
-
#include <cstdint>
+#include "rocketmq/RocketMQ.h"
+
ROCKETMQ_NAMESPACE_BEGIN
enum class ReceiveMessageAction : std::int8_t {
diff --git a/src/main/cpp/rocketmq/include/StaticNameServerResolver.h b/src/main/cpp/rocketmq/include/StaticNameServerResolver.h
index 1d62106..ed798b4 100644
--- a/src/main/cpp/rocketmq/include/StaticNameServerResolver.h
+++ b/src/main/cpp/rocketmq/include/StaticNameServerResolver.h
@@ -13,7 +13,7 @@
class StaticNameServerResolver : public NameServerResolver {
public:
- StaticNameServerResolver(absl::string_view name_server_list);
+ explicit StaticNameServerResolver(absl::string_view name_server_list);
void start() override {}
diff --git a/src/main/cpp/rocketmq/include/TransactionImpl.h b/src/main/cpp/rocketmq/include/TransactionImpl.h
index 7fac135..be33cee 100644
--- a/src/main/cpp/rocketmq/include/TransactionImpl.h
+++ b/src/main/cpp/rocketmq/include/TransactionImpl.h
@@ -1,10 +1,10 @@
#pragma once
-#include "rocketmq/Transaction.h"
-
#include <memory>
#include <string>
+#include "rocketmq/Transaction.h"
+
ROCKETMQ_NAMESPACE_BEGIN
class ProducerImpl;
diff --git a/src/main/cpp/scheduler/include/SchedulerImpl.h b/src/main/cpp/scheduler/include/SchedulerImpl.h
index f5f6504..fdbe98f 100644
--- a/src/main/cpp/scheduler/include/SchedulerImpl.h
+++ b/src/main/cpp/scheduler/include/SchedulerImpl.h
@@ -1,6 +1,4 @@
#pragma once
-#include "Scheduler.h"
-
#include <atomic>
#include <chrono>
#include <cstdint>
@@ -13,6 +11,8 @@
#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/mutex.h"
#include "asio.hpp"
+
+#include "Scheduler.h"
#include "rocketmq/State.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/main/cpp/tracing/exporters/include/OtlpExporter.h b/src/main/cpp/tracing/exporters/include/OtlpExporter.h
index e8a8204..536f8d2 100644
--- a/src/main/cpp/tracing/exporters/include/OtlpExporter.h
+++ b/src/main/cpp/tracing/exporters/include/OtlpExporter.h
@@ -1,8 +1,11 @@
#pragma once
-#include "ClientConfig.h"
-#include "ClientManager.h"
-#include "InvocationContext.h"
+#include <algorithm>
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <thread>
+
#include "absl/container/flat_hash_map.h"
#include "absl/memory/memory.h"
#include "absl/synchronization/mutex.h"
@@ -10,12 +13,11 @@
#include "opencensus/trace/exporter/span_exporter.h"
#include "opencensus/trace/sampler.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"
+
+#include "ClientConfig.h"
+#include "ClientManager.h"
+#include "InvocationContext.h"
#include "rocketmq/RocketMQ.h"
-#include <algorithm>
-#include <atomic>
-#include <cstdint>
-#include <memory>
-#include <thread>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/src/test/cpp/it/RpcClientTest.cpp b/src/test/cpp/it/RpcClientTest.cpp
index cd67034..c9dddab 100644
--- a/src/test/cpp/it/RpcClientTest.cpp
+++ b/src/test/cpp/it/RpcClientTest.cpp
@@ -13,9 +13,9 @@
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
#include "gtest/gtest.h"
+#include <iostream>
#include <thread>
#include <unordered_map>
-#include <iostream>
using namespace testing;
@@ -174,7 +174,7 @@
std::string region_id_{"cn-hangzhou"};
std::string service_name_{"MQ"};
absl::flat_hash_map<std::string, std::string> metadata_;
- ClientConfigImpl client_config_;
+ ClientConfigImpl client_config_{group_};
CredentialsProviderPtr credentials_provider_;
std::shared_ptr<grpc::experimental::StaticDataCertificateProvider> certificate_provider_;
grpc::experimental::TlsChannelCredentialsOptions tls_channel_credential_options_;
diff --git a/src/test/cpp/it/TopicPublishInfoTest.cpp b/src/test/cpp/it/TopicPublishInfoTest.cpp
index 488a227..a767453 100644
--- a/src/test/cpp/it/TopicPublishInfoTest.cpp
+++ b/src/test/cpp/it/TopicPublishInfoTest.cpp
@@ -1,12 +1,12 @@
#include "TopicPublishInfo.h"
+#include "ClientConfigImpl.h"
#include "LogInterceptorFactory.h"
#include "RpcClientImpl.h"
#include "Signature.h"
#include "TlsHelper.h"
-#include "rocketmq/MQMessageQueue.h"
-#include "ClientConfigImpl.h"
-#include "gtest/gtest.h"
#include "grpcpp/security/tls_credentials_options.h"
+#include "rocketmq/MQMessageQueue.h"
+#include "gtest/gtest.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -62,7 +62,7 @@
std::string region_id_{"cn-hangzhou"};
std::string service_name_{"MQ"};
std::string target_{"dns:grpc.dev:9876"};
- ClientConfigImpl client_config_;
+ ClientConfigImpl client_config_{group_};
absl::flat_hash_map<std::string, std::string> metadata_;
std::shared_ptr<grpc::CompletionQueue> completion_queue_;
std::shared_ptr<RpcClientImpl> client_;
@@ -81,7 +81,7 @@
request.mutable_topic()->set_name(topic_);
auto invocation_context = new InvocationContext<QueryRouteResponse>();
invocation_context->context.set_deadline(std::chrono::system_clock::now() +
- absl::ToChronoMilliseconds(client_config_.getIoTimeout()));
+ absl::ToChronoMilliseconds(client_config_.getIoTimeout()));
for (const auto& item : metadata_) {
invocation_context->context.AddMetadata(item.first, item.second);
}
diff --git a/src/test/cpp/ut/rocketmq/BUILD.bazel b/src/test/cpp/ut/rocketmq/BUILD.bazel
index 47fa3d8..b1fce27 100644
--- a/src/test/cpp/ut/rocketmq/BUILD.bazel
+++ b/src/test/cpp/ut/rocketmq/BUILD.bazel
@@ -223,4 +223,15 @@
"//src/main/cpp/base/mocks:base_mocks",
"@com_google_googletest//:gtest_main",
],
+)
+
+cc_test(
+ name = "default_mq_push_consumer_test",
+ srcs = [
+ "DefaultMQPushConsumerTest.cpp",
+ ],
+ deps = [
+ "//src/main/cpp/rocketmq:rocketmq_library",
+ "@com_google_googletest//:gtest_main",
+ ],
)
\ No newline at end of file
diff --git a/src/test/cpp/ut/rocketmq/DefaultMQPushConsumerTest.cpp b/src/test/cpp/ut/rocketmq/DefaultMQPushConsumerTest.cpp
new file mode 100644
index 0000000..845966f
--- /dev/null
+++ b/src/test/cpp/ut/rocketmq/DefaultMQPushConsumerTest.cpp
@@ -0,0 +1,18 @@
+#include "rocketmq/DefaultMQPushConsumer.h"
+
+#include "gtest/gtest.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class DefaultMQPushConsumerTest : public testing::Test {
+public:
+ DefaultMQPushConsumerTest() : group_name_("group-0"), consumer_(group_name_) {}
+
+protected:
+ std::string group_name_;
+ DefaultMQPushConsumer consumer_;
+};
+
+TEST_F(DefaultMQPushConsumerTest, testGroupName) { EXPECT_EQ(group_name_, consumer_.groupName()); }
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file