[ISSUE #1001] C++ client should block on build when sync settings failed (#1002)
diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index 5dc0302..667b6af 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -11,7 +11,7 @@
# Disable VS 2022 before https://github.com/bazelbuild/bazel/issues/18592 issue is solved
# Remove macos-11 since there is no such runner available
# os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, windows-2022]
- os: [ubuntu-20.04, ubuntu-22.04, windows-2019]
+ os: [ubuntu-22.04, windows-2019]
steps:
- uses: actions/checkout@v2
- name: Compile On Linux
diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp
index 3b22ddd..95aaa4f 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -94,22 +94,21 @@
SPDLOG_WARN("Unexpected client instance state: {}", state_.load(std::memory_order_relaxed));
return;
}
+
state_.store(State::STARTING, std::memory_order_relaxed);
callback_thread_pool_->start();
-
scheduler_->start();
std::weak_ptr<ClientManagerImpl> client_instance_weak_ptr = shared_from_this();
-
auto heartbeat_functor = [client_instance_weak_ptr]() {
auto client_instance = client_instance_weak_ptr.lock();
if (client_instance) {
client_instance->doHeartbeat();
}
};
- heartbeat_task_id_ =
- scheduler_->schedule(heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10));
+ heartbeat_task_id_ = scheduler_->schedule(
+ heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10));
SPDLOG_DEBUG("Heartbeat task-id={}", heartbeat_task_id_);
state_.store(State::STARTED, std::memory_order_relaxed);
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp
index 74e8689..2e12f43 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -38,7 +38,6 @@
auto ptr = client_.lock();
auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
context_.set_deadline(deadline);
- sync_settings_future_ = sync_settings_promise_.get_future();
Metadata metadata;
Signature::sign(ptr->config(), metadata);
for (const auto& entry : metadata) {
@@ -56,8 +55,19 @@
}
bool TelemetryBidiReactor::awaitApplyingSettings() {
- sync_settings_future_.get();
- return true;
+ auto settings_future = sync_settings_promise_.get_future();
+ std::future_status status = settings_future.wait_for(std::chrono::seconds(3));
+ if (status == std::future_status::ready) {
+ if (settings_future.get()) {
+ return true;
+ }
+ }
+ {
+ absl::MutexLock lk(&state_mtx_);
+ state_ = StreamState::Closed;
+ state_cv_.SignalAll();
+ }
+ return false;
}
void TelemetryBidiReactor::OnWriteDone(bool ok) {
@@ -283,21 +293,29 @@
static_cast<std::uint8_t>(state_));
return;
}
+
if (writes_.empty()) {
- SPDLOG_DEBUG("No TelemetryCommand to write. Peer={}", peer_address_);
+ SPDLOG_DEBUG("No pending TelemetryCommand to write. Peer={}", peer_address_);
return;
}
if (!writes_.empty()) {
- SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().ShortDebugString());
- AddHold();
- StartWrite(&(writes_.front()));
+ SPDLOG_DEBUG("Writing TelemetryCommand to {}: {}", peer_address_, writes_.front().ShortDebugString());
+ if (StreamState::Ready == state_) {
+ AddHold();
+ StartWrite(&(writes_.front()));
+ } else {
+ SPDLOG_WARN("Writing TelemetryCommand error due to unexpected state. State={}, Peer={}",
+ static_cast<uint8_t>(state_), peer_address_);
+ }
}
}
void TelemetryBidiReactor::signalClose() {
absl::MutexLock lk(&state_mtx_);
- state_ = StreamState::Closing;
+ if (state_ == StreamState::Ready) {
+ state_ = StreamState::Closing;
+ }
}
void TelemetryBidiReactor::close() {
@@ -361,7 +379,7 @@
if (!ok) {
// for read stream
// Remove the hold corresponding to AddHold in TelemetryBidiReactor::TelemetryBidiReactor.
- RemoveHold();
+ // RemoveHold();
SPDLOG_DEBUG("Change state {} --> {}", static_cast<std::uint8_t>(state_),
static_cast<std::uint8_t>(StreamState::Closing));
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h b/cpp/source/client/include/TelemetryBidiReactor.h
index 4bd58f9..3217fd4 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -40,19 +40,25 @@
Closed = 2,
};
-/// stream-state: ready --> closing --> closed
+/// TelemetryBidiReactor: Manages a bidirectional gRPC stream for telemetry data
///
-/// requirement:
-/// 1, close --> blocking wait till bidireactor is closed;
-/// 2, when session is closed and client is still active, recreate a new session to accept incoming commands from
-/// server
+/// Stream State Transitions:
+/// Ready --> Closing --> Closed
///
+/// Key Features:
+/// 1. Close Operation: Performs a blocking wait until the bidirectional reactor is fully closed.
+/// 2. Session Management: If the session closes while the client is still active,
+/// it automatically initiates the creation of a new session to maintain
+/// communication with the server.
+///
+/// The reactor handles reading from and writing to the stream, manages stream state,
+/// and applies settings received from the server.
class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>,
public std::enable_shared_from_this<TelemetryBidiReactor> {
public:
TelemetryBidiReactor(std::weak_ptr<Client> client, rmq::MessagingService::Stub* stub, std::string peer_address);
- ~TelemetryBidiReactor();
+ ~TelemetryBidiReactor() override;
/// Notifies the application that all operations associated with this RPC
/// have completed and all Holds have been removed. OnDone provides the RPC
@@ -125,7 +131,6 @@
absl::CondVar state_cv_;
std::promise<bool> sync_settings_promise_;
- std::future<bool> sync_settings_future_;
void applySettings(const rmq::Settings& settings);
@@ -137,6 +142,7 @@
/// Attempt to write pending telemetry command to server.
void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_);
+
void signalClose();
};
diff --git a/cpp/source/rocketmq/ClientImpl.cpp b/cpp/source/rocketmq/ClientImpl.cpp
index 7f91b04..a5cfae8 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -106,26 +106,30 @@
name_server_resolver_->start();
client_config_.client_id = clientId();
-
if (!client_manager_) {
- client_manager_ = std::make_shared<ClientManagerImpl>(client_config_.resource_namespace, client_config_.withSsl);
+ client_manager_ = std::make_shared<ClientManagerImpl>(
+ client_config_.resource_namespace, client_config_.withSsl);
+ client_manager_->start();
}
- client_manager_->start();
const auto& endpoint = name_server_resolver_->resolve();
if (endpoint.empty()) {
SPDLOG_ERROR("Failed to resolve name server address");
- abort();
+ return;
}
- createSession(endpoint, false);
- {
- absl::MutexLock lk(&session_map_mtx_);
- session_map_[endpoint]->await();
+ while (true) {
+ createSession(endpoint, false);
+ {
+ absl::MutexLock lk(&session_map_mtx_);
+ if (session_map_.contains(endpoint) && session_map_[endpoint]->await()) {
+ break;
+ }
+ session_map_.erase(endpoint);
+ }
}
std::weak_ptr<ClientImpl> ptr(self());
-
{
// Query routes for topics of interest in synchronous
std::vector<std::string> topics;
@@ -164,8 +168,9 @@
}
};
- route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
- std::chrono::seconds(10), std::chrono::seconds(30));
+ route_update_handle_ = client_manager_->getScheduler()->schedule(
+ route_update_functor, UPDATE_ROUTE_TASK_NAME,
+ std::chrono::seconds(10), std::chrono::seconds(30));
auto telemetry_functor = [ptr]() {
std::shared_ptr<ClientImpl> base = ptr.lock();
@@ -597,8 +602,11 @@
Signature::sign(client_config_, metadata);
for (const auto& endpoint : endpoints) {
- client_manager_->notifyClientTermination(endpoint, metadata, request,
- absl::ToChronoMilliseconds(client_config_.request_timeout));
+ std::error_code ec = client_manager_->notifyClientTermination(
+ endpoint, metadata, request,absl::ToChronoMilliseconds(client_config_.request_timeout));
+ if (ec) {
+ SPDLOG_WARN("Notify client termination error, ErrorCode={}, Endpoint={}", ec.message(), endpoint);
+ }
}
}
diff --git a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
index 2b5e7c7..57268f3 100644
--- a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -72,7 +72,7 @@
return;
}
- for (auto message : messages) {
+ for (const auto& message : messages) {
auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(), process_queue, message);
pool_->submit([consume_task]() { consume_task->process(); });
}
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp
index 7ea7d4f..b7dd82e 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -56,7 +56,7 @@
State expecting = State::STARTING;
if (!state_.compare_exchange_strong(expecting, State::STARTED)) {
- SPDLOG_ERROR("Start with unexpected state. Expecting: {}, Actual: {}", State::STARTING,
+ SPDLOG_ERROR("Producer started with an unexpected state. Expecting: {}, Actual: {}", State::STARTING,
state_.load(std::memory_order_relaxed));
return;
}
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 68bb9f6..acc0817 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -65,7 +65,6 @@
if (!message_listener_) {
SPDLOG_ERROR("Required message listener is missing");
abort();
- return;
}
client_config_.subscriber.group.set_resource_namespace(resourceNamespace());
diff --git a/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h b/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
index f2f54bb..ee0e78b 100644
--- a/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
+++ b/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
@@ -43,8 +43,8 @@
* Make it noncopyable.
*/
ConsumeMessageServiceImpl(const ConsumeMessageServiceImpl &other) = delete;
- ConsumeMessageServiceImpl &
- operator=(const ConsumeMessageServiceImpl &other) = delete;
+
+ ConsumeMessageServiceImpl &operator=(const ConsumeMessageServiceImpl &other) = delete;
void start() override;