| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "TelemetryBidiReactor.h" |
| |
| #include <atomic> |
| #include <cstdint> |
| #include <memory> |
| #include <utility> |
| |
| #include "ClientManager.h" |
| #include "MessageExt.h" |
| #include "Metadata.h" |
| #include "RpcClient.h" |
| #include "Signature.h" |
| #include "google/protobuf/util/time_util.h" |
| #include "rocketmq/Logger.h" |
| #include "spdlog/spdlog.h" |
| |
| ROCKETMQ_NAMESPACE_BEGIN |
| |
| TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, |
| rmq::MessagingService::Stub* stub, |
| std::string peer_address) |
| : client_(client), |
| peer_address_(std::move(peer_address)), |
| state_(StreamState::Ready) { |
| auto ptr = client_.lock(); |
| auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1); |
| context_.set_deadline(deadline); |
| sync_settings_future_ = sync_settings_promise_.get_future(); |
| Metadata metadata; |
| Signature::sign(ptr->config(), metadata); |
| for (const auto& entry : metadata) { |
| context_.AddMetadata(entry.first, entry.second); |
| } |
| stub->async()->Telemetry(&context_, this); |
| StartRead(&read_); |
| // for read stream |
| AddHold(); |
| StartCall(); |
| } |
| |
| TelemetryBidiReactor::~TelemetryBidiReactor() { |
| SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_, static_cast<std::uint8_t>(state_)); |
| } |
| |
| bool TelemetryBidiReactor::awaitApplyingSettings() { |
| sync_settings_future_.get(); |
| return true; |
| } |
| |
| void TelemetryBidiReactor::OnWriteDone(bool ok) { |
| SPDLOG_DEBUG("{}#OnWriteDone", peer_address_); |
| |
| // for write stream |
| RemoveHold(); |
| |
| if (!ok) { |
| SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().DebugString(), peer_address_); |
| signalClose(); |
| return; |
| } |
| |
| // Remove the command that has been written to server. |
| { |
| absl::MutexLock lk(&writes_mtx_); |
| if (!writes_.empty()) { |
| writes_.pop_front(); |
| } |
| } |
| |
| tryWriteNext(); |
| } |
| |
| void TelemetryBidiReactor::OnReadDone(bool ok) { |
| SPDLOG_DEBUG("{}#OnReadDone", peer_address_); |
| if (!ok) { |
| // for read stream |
| RemoveHold(); |
| SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_); |
| signalClose(); |
| return; |
| } |
| |
| { |
| absl::MutexLock lk(&state_mtx_); |
| if (StreamState::Ready != state_) { |
| return; |
| } |
| } |
| |
| SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString()); |
| auto client = client_.lock(); |
| if (!client) { |
| SPDLOG_INFO("Client for {} has destructed", peer_address_); |
| signalClose(); |
| return; |
| } |
| |
| switch (read_.command_case()) { |
| case rmq::TelemetryCommand::kSettings: { |
| auto settings = read_.settings(); |
| SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString()); |
| applySettings(settings); |
| sync_settings_promise_.set_value(true); |
| break; |
| } |
| case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: { |
| SPDLOG_DEBUG("Receive orphan transaction command: {}", read_.DebugString()); |
| auto message = client->manager()->wrapMessage(read_.release_verify_message_command()->message()); |
| auto raw = const_cast<Message*>(message.get()); |
| raw->mutableExtension().target_endpoint = peer_address_; |
| raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id(); |
| client->recoverOrphanedTransaction(message); |
| |
| break; |
| } |
| |
| case rmq::TelemetryCommand::kPrintThreadStackTraceCommand: { |
| TelemetryCommand response; |
| response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce()); |
| response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace is not supported"); |
| write(std::move(response)); |
| break; |
| } |
| |
| case rmq::TelemetryCommand::kVerifyMessageCommand: { |
| std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this()); |
| auto cb = [ptr](TelemetryCommand command) { |
| auto reactor = ptr.lock(); |
| if (!reactor) { |
| return; |
| } |
| reactor->write(std::move(command)); |
| }; |
| auto message = client->manager()->wrapMessage(read_.verify_message_command().message()); |
| auto raw = const_cast<Message*>(message.get()); |
| raw->mutableExtension().target_endpoint = peer_address_; |
| raw->mutableExtension().nonce = read_.verify_message_command().nonce(); |
| client->verify(message, cb); |
| break; |
| } |
| |
| default: { |
| SPDLOG_WARN("Unsupported command"); |
| break; |
| } |
| } |
| |
| { |
| absl::MutexLock lk(&state_mtx_); |
| if (StreamState::Ready == state_) { |
| SPDLOG_DEBUG("Spawn new read op, state={}", static_cast<std::uint8_t>(state_)); |
| StartRead(&read_); |
| } |
| } |
| } |
| |
| void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) { |
| auto ptr = client_.lock(); |
| if (!ptr) { |
| SPDLOG_INFO("Client for {} has destructed", peer_address_); |
| return; |
| } |
| |
| applyBackoffPolicy(settings, ptr); |
| |
| // Sync metrics collector configuration |
| if (settings.has_metric()) { |
| const auto& metric = settings.metric(); |
| ptr->config().metric.on = metric.on(); |
| ptr->config().metric.endpoints.set_scheme(metric.endpoints().scheme()); |
| ptr->config().metric.endpoints.mutable_addresses()->CopyFrom(metric.endpoints().addresses()); |
| } |
| |
| switch (settings.pub_sub_case()) { |
| case rmq::Settings::PubSubCase::kPublishing: { |
| applyPublishingConfig(settings, ptr); |
| break; |
| } |
| case rmq::Settings::PubSubCase::kSubscription: { |
| applySubscriptionConfig(settings, ptr); |
| break; |
| } |
| default: { |
| break; |
| } |
| } |
| } |
| |
| void TelemetryBidiReactor::applyBackoffPolicy(const rmq::Settings& settings, std::shared_ptr<Client>& ptr) { |
| // Apply backoff policy on throttling |
| if (settings.has_backoff_policy()) { |
| const auto& backoff_policy = settings.backoff_policy(); |
| ptr->config().backoff_policy.max_attempt = backoff_policy.max_attempts(); |
| if (backoff_policy.has_customized_backoff()) { |
| for (const auto& item : backoff_policy.customized_backoff().next()) { |
| auto backoff = std::chrono::seconds(item.seconds()) + std::chrono::nanoseconds(item.nanos()); |
| ptr->config().backoff_policy.next.push_back(absl::FromChrono(backoff)); |
| } |
| } |
| |
| if (backoff_policy.has_exponential_backoff()) { |
| const auto& exp_backoff = backoff_policy.exponential_backoff(); |
| ptr->config().backoff_policy.initial = absl::FromChrono(std::chrono::seconds(exp_backoff.initial().seconds()) + |
| std::chrono::nanoseconds(exp_backoff.initial().nanos())); |
| ptr->config().backoff_policy.multiplier = exp_backoff.multiplier(); |
| ptr->config().backoff_policy.max = absl::FromChrono(std::chrono::seconds(exp_backoff.max().seconds()) + |
| std::chrono::nanoseconds(exp_backoff.max().nanos())); |
| } |
| } |
| } |
| |
| void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) { |
| // The server may have implicitly assumed a namespace for the client. |
| if (!settings.publishing().topics().empty()) { |
| for (const auto& topic : settings.publishing().topics()) { |
| if (topic.resource_namespace() != client->config().resource_namespace) { |
| SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", client->config().resource_namespace, |
| topic.resource_namespace()); |
| client->config().resource_namespace = topic.resource_namespace(); |
| break; |
| } |
| } |
| } |
| client->config().publisher.max_body_size = settings.publishing().max_body_size(); |
| } |
| |
| void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) { |
| // The server may have implicitly assumed a namespace for the client. |
| if (!settings.subscription().subscriptions().empty()) { |
| for (const auto& subscription : settings.subscription().subscriptions()) { |
| if (subscription.topic().resource_namespace() != client->config().resource_namespace) { |
| SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", client->config().resource_namespace, |
| subscription.topic().resource_namespace()); |
| client->config().resource_namespace = subscription.topic().resource_namespace(); |
| break; |
| } |
| } |
| } |
| |
| client->config().subscriber.fifo = settings.subscription().fifo(); |
| auto polling_timeout = |
| google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout()); |
| client->config().subscriber.polling_timeout = absl::Milliseconds(polling_timeout); |
| client->config().subscriber.receive_batch_size = settings.subscription().receive_batch_size(); |
| } |
| |
| void TelemetryBidiReactor::write(TelemetryCommand command) { |
| SPDLOG_DEBUG("{}#write", peer_address_); |
| { |
| absl::MutexLock lk(&state_mtx_); |
| // Reject incoming write commands if the stream state is closing or has witnessed some error. |
| if (StreamState::Ready != state_) { |
| return; |
| } |
| } |
| |
| { |
| absl::MutexLock lk(&writes_mtx_); |
| writes_.push_back(command); |
| } |
| tryWriteNext(); |
| } |
| |
| void TelemetryBidiReactor::tryWriteNext() { |
| SPDLOG_DEBUG("{}#tryWriteNext", peer_address_); |
| absl::MutexLock lk(&writes_mtx_); |
| if (StreamState::Ready != state_) { |
| SPDLOG_WARN("Further write to {} is not allowed due to stream-state={}", peer_address_, |
| static_cast<std::uint8_t>(state_)); |
| return; |
| } |
| if (writes_.empty()) { |
| SPDLOG_DEBUG("No TelemetryCommand to write. Peer={}", peer_address_); |
| return; |
| } |
| |
| if (!writes_.empty()) { |
| SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString()); |
| AddHold(); |
| StartWrite(&(writes_.front())); |
| } |
| } |
| |
| void TelemetryBidiReactor::signalClose() { |
| absl::MutexLock lk(&state_mtx_); |
| state_ = StreamState::Closing; |
| } |
| |
| void TelemetryBidiReactor::close() { |
| SPDLOG_INFO("{}#fireClose", peer_address_); |
| |
| { |
| absl::MutexLock lk(&state_mtx_); |
| if (state_ == StreamState::Ready) { |
| state_ = StreamState::Closing; |
| } |
| } |
| |
| { |
| absl::MutexLock lk(&writes_mtx_); |
| writes_.clear(); |
| } |
| context_.TryCancel(); |
| |
| { |
| // Acquire state lock |
| absl::MutexLock lk(&state_mtx_); |
| while (StreamState::Closed != state_) { |
| if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) { |
| SPDLOG_WARN("StreamState CondVar timed out before getting signalled: state={}", |
| static_cast<uint8_t>(state_)); |
| } |
| } |
| } |
| } |
| |
| /// Notifies the application that all operations associated with this RPC |
| /// have completed and all Holds have been removed. OnDone provides the RPC |
| /// status outcome for both successful and failed RPCs and will be called in |
| /// all cases. If it is not called, it indicates an application-level problem |
| /// (like failure to remove a hold). |
| /// |
| /// \param[in] status The status outcome of this RPC |
| void TelemetryBidiReactor::OnDone(const grpc::Status& status) { |
| SPDLOG_DEBUG("{}#OnDone, status.ok={}", peer_address_, status.ok()); |
| if (!status.ok()) { |
| SPDLOG_WARN("{}#OnDone, status.error_code={}, status.error_message={}, status.error_details={}", peer_address_, |
| status.error_code(), status.error_message(), status.error_details()); |
| } |
| { |
| absl::MutexLock lk(&state_mtx_); |
| state_ = StreamState::Closed; |
| state_cv_.SignalAll(); |
| } |
| |
| auto client = client_.lock(); |
| if (!client) { |
| return; |
| } |
| |
| if (client->active()) { |
| client->createSession(peer_address_, true); |
| } |
| } |
| |
| void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) { |
| SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_); |
| |
| if (!ok) { |
| // for read stream |
| // Remove the hold corresponding to AddHold in TelemetryBidiReactor::TelemetryBidiReactor. |
| RemoveHold(); |
| |
| SPDLOG_DEBUG("Change state {} --> {}", static_cast<std::uint8_t>(state_), |
| static_cast<std::uint8_t>(StreamState::Closing)); |
| SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_); |
| signalClose(); |
| return; |
| } |
| |
| SPDLOG_DEBUG("Received initial metadata from {}", peer_address_); |
| } |
| |
| ROCKETMQ_NAMESPACE_END |