Broadcasting (#25)
* Complete heartbeat data
* Add broadcasting example
diff --git a/example/rocketmq/ExampleBroadcastPushConsumer.cpp b/example/rocketmq/ExampleBroadcastPushConsumer.cpp
index 2d59b0e..64b527a 100644
--- a/example/rocketmq/ExampleBroadcastPushConsumer.cpp
+++ b/example/rocketmq/ExampleBroadcastPushConsumer.cpp
@@ -1,12 +1,10 @@
#include "rocketmq/DefaultMQPushConsumer.h"
-#ifndef SPDLOG_ACTIVE_LEVEL
-#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_DEBUG
-#endif
-
+#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
#include <chrono>
+#include <iostream>
#include <mutex>
#include <thread>
@@ -17,6 +15,7 @@
ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) override {
for (const MQMessageExt& msg : msgs) {
SPDLOG_INFO("Receive a message. MessageId={}", msg.getMsgId());
+ std::cout << "Received a message. MessageId: " << msg.getMsgId() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return ConsumeMessageResult::SUCCESS;
@@ -24,7 +23,6 @@
};
int main(int argc, char* argv[]) {
-
Logger& logger = getLogger();
logger.setLevel(Level::Debug);
logger.init();
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index 6a4068b..b662e26 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -1348,6 +1348,11 @@
SPDLOG_WARN("Unauthenticated: {}, host={}", common.status().message(), invocation_context->remote_address);
ec = ErrorCode::Unauthorized;
} break;
+ case google::rpc::Code::NOT_FOUND: {
+ SPDLOG_WARN("NotFound: {}, host={}", common.status().message(), invocation_context->remote_address);
+ ec = ErrorCode::NotFound;
+ break;
+ }
case google::rpc::Code::DEADLINE_EXCEEDED: {
SPDLOG_WARN("DeadlineExceeded: {}, host={}", common.status().message(), invocation_context->remote_address);
ec = ErrorCode::GatewayTimeout;
diff --git a/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 9927f92..4386b84 100644
--- a/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -581,7 +581,6 @@
}
auto subscriptions = consumer_data->mutable_subscriptions();
-
{
absl::MutexLock lk(&topic_filter_expression_table_mtx_);
for (const auto& entry : topic_filter_expression_table_) {
@@ -613,6 +612,10 @@
break;
}
}
+
+ consumer_data->set_consume_policy(rmq::ConsumePolicy::RESUME);
+ consumer_data->mutable_dead_letter_policy()->set_max_delivery_attempts(maxDeliveryAttempts());
+ consumer_data->set_consume_type(rmq::ConsumeMessageType::PASSIVE);
}
ClientResourceBundle PushConsumerImpl::resourceBundle() {