blob: 051f50d89fd3e32ea8c8bf64af60564f218a5d4b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <iostream>
#include <thread>
#include <unordered_map>
#include "absl/container/flat_hash_set.h"
#include "apache/rocketmq/v1/service.pb.h"
#include "google/rpc/code.pb.h"
#include "spdlog/spdlog.h"
#include "gtest/gtest.h"
#include "ClientConfigImpl.h"
#include "InvocationContext.h"
#include "LogInterceptorFactory.h"
#include "MixAll.h"
#include "RpcClientImpl.h"
#include "Signature.h"
#include "TlsHelper.h"
#include "UniqueIdGenerator.h"
#include "UtilAll.h"
#include "rocketmq/CredentialsProvider.h"
#include "rocketmq/Logger.h"
using namespace testing;
ROCKETMQ_NAMESPACE_BEGIN
class RpcClientTest : public ::testing::Test {
protected:
RpcClientTest() : completion_queue_(std::make_shared<grpc::CompletionQueue>()) {
server_authorization_check_config_ = std::make_shared<grpc::experimental::TlsServerAuthorizationCheckConfig>(
std::make_shared<TlsServerAuthorizationChecker>());
std::vector<grpc::experimental::IdentityKeyCertPair> pem_list;
grpc::experimental::IdentityKeyCertPair pair{};
pair.private_key = TlsHelper::client_private_key;
pair.certificate_chain = TlsHelper::client_certificate_chain;
pem_list.emplace_back(pair);
certificate_provider_ =
std::make_shared<grpc::experimental::StaticDataCertificateProvider>(TlsHelper::CA, pem_list);
tls_channel_credential_options_.set_certificate_provider(certificate_provider_);
tls_channel_credential_options_.set_server_verification_option(GRPC_TLS_SKIP_ALL_SERVER_VERIFICATION);
tls_channel_credential_options_.set_server_authorization_check_config(server_authorization_check_config_);
tls_channel_credential_options_.watch_root_certs();
tls_channel_credential_options_.watch_identity_key_cert_pairs();
channel_credential_ = grpc::experimental::TlsCredentials(tls_channel_credential_options_);
credentials_provider_ = std::make_shared<ConfigFileCredentialsProvider>();
client_config_.tenantId(tenant_id_);
client_config_.setCredentialsProvider(credentials_provider_);
}
void SetUp() override {
getLogger().setLevel(Level::Debug);
client_config_.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
client_config_.resourceNamespace(resource_namespace_);
client_config_.region(region_id_);
client_config_.tenantId(tenant_id_);
Signature::sign(&client_config_, metadata_);
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
name_server_channel_ = grpc::experimental::CreateCustomChannelWithInterceptors(
name_server_target_, channel_credential_, channel_arguments_, std::move(interceptor_factories));
}
void TearDown() override {
}
~RpcClientTest() override {
completion_queue_->Shutdown();
}
bool brokerEndpoint(const std::string& topic, std::string& endpoint) {
QueryRouteRequest route_request;
route_request.mutable_topic()->set_name(topic.data());
QueryRouteResponse route_response;
RpcClientImpl name_server_client(completion_queue_, name_server_channel_);
absl::Mutex mtx;
absl::CondVar cv;
bool completed = false;
auto callback = [&](const InvocationContext<QueryRouteResponse>* invocation_context) {
ASSERT_TRUE(invocation_context->status.ok());
std::cout << "Route Response:" << route_response.DebugString() << std::endl;
absl::MutexLock lk(&mtx);
route_response = invocation_context->response;
completed = true;
cv.SignalAll();
};
auto invocation_context = new InvocationContext<QueryRouteResponse>();
invocation_context->callback = callback;
invocation_context->context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(3));
for (const auto& item : metadata_) {
invocation_context->context.AddMetadata(item.first, item.second);
}
name_server_client.asyncQueryRoute(route_request, invocation_context);
absl::flat_hash_set<std::string> broker_addresses;
for (auto& partition : route_response.partitions()) {
auto& broker = partition.broker();
if (MixAll::MASTER_BROKER_ID == broker.id()) {
for (auto& item : broker.endpoints().addresses()) {
std::string connect_string(fmt::format("{}:{}", item.host(), item.port()));
if (!broker_addresses.contains(connect_string)) {
broker_addresses.insert(connect_string);
}
}
} else {
for (const auto& item : broker.endpoints().addresses()) {
SPDLOG_WARN("Unexpected endpoint[{}:{}] with brokerId={}", item.host(), item.port(), broker.id());
}
}
}
if (broker_addresses.empty()) {
return false;
}
endpoint = *broker_addresses.begin();
return true;
}
void fillSendMessageRequest(SendMessageRequest& send_message_request) {
send_message_request.mutable_message()->mutable_topic()->set_resource_namespace(resource_namespace_);
send_message_request.mutable_message()->mutable_topic()->set_name(topic_);
std::unordered_map<std::string, std::string> props;
props["key"] = "value";
props["Jack"] = "Bauer";
send_message_request.mutable_message()->mutable_user_attribute()->insert(props.begin(), props.end());
auto system_attribute = send_message_request.mutable_message()->mutable_system_attribute();
system_attribute->set_message_id(message_id_);
system_attribute->set_message_type(rmq::MessageType::NORMAL);
system_attribute->set_body_encoding(rmq::Encoding::IDENTITY);
system_attribute->set_born_host(UtilAll::hostname());
system_attribute->set_tag("TagA");
send_message_request.mutable_message()->set_body("Example data");
}
RpcClientSharedPtr brokerRpcClient() {
std::string broker_endpoint;
bool success = brokerEndpoint(topic_, broker_endpoint);
if (!success) {
return nullptr;
}
SPDLOG_INFO("Target broker address: {}", broker_endpoint);
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
auto broker_channel = grpc::experimental::CreateCustomChannelWithInterceptors(
broker_endpoint, channel_credential_, channel_arguments_, std::move(interceptor_factories));
auto client = std::make_shared<RpcClientImpl>(completion_queue_, broker_channel);
return client;
}
void fillHeartbeatRequest(HeartbeatRequest& heartbeat_request) {
heartbeat_request.set_client_id("client_id_0");
auto consumer_data = heartbeat_request.mutable_consumer_data();
consumer_data->mutable_group()->set_resource_namespace(resource_namespace_);
consumer_data->mutable_group()->set_name(topic_);
auto subscription_entry = new rmq::SubscriptionEntry;
subscription_entry->mutable_topic()->set_name(topic_);
subscription_entry->mutable_topic()->set_resource_namespace(resource_namespace_);
subscription_entry->mutable_expression()->set_type(rmq::FilterType::TAG);
subscription_entry->mutable_expression()->set_expression("*");
consumer_data->mutable_subscriptions()->AddAllocated(subscription_entry);
heartbeat_request.set_fifo_flag(false);
}
std::string name_server_target_{"47.98.116.189:80"};
std::shared_ptr<grpc::CompletionQueue> completion_queue_;
std::shared_ptr<grpc::Channel> name_server_channel_;
std::string topic_{"cpp_sdk_standard"};
std::string group_{"GID_cpp_sdk_standard"};
std::string resource_namespace_{"MQ_INST_1080056302921134_BXuIbML7"};
std::string tenant_id_{"sample-tenant"};
std::string region_id_{"cn-hangzhou"};
std::string service_name_{"MQ"};
absl::flat_hash_map<std::string, std::string> metadata_;
ClientConfigImpl client_config_{group_};
CredentialsProviderPtr credentials_provider_;
std::shared_ptr<grpc::experimental::StaticDataCertificateProvider> certificate_provider_;
grpc::experimental::TlsChannelCredentialsOptions tls_channel_credential_options_;
std::shared_ptr<grpc::experimental::TlsServerAuthorizationCheckConfig> server_authorization_check_config_;
std::shared_ptr<grpc::ChannelCredentials> channel_credential_;
grpc::ChannelArguments channel_arguments_;
std::string message_id_{UniqueIdGenerator::instance().next()};
};
TEST_F(RpcClientTest, testRouteInfo) {
RpcClientImpl client(completion_queue_, name_server_channel_);
QueryRouteRequest request;
request.mutable_topic()->set_name(topic_);
request.mutable_topic()->set_resource_namespace(resource_namespace_);
auto invocation_context = new InvocationContext<QueryRouteResponse>();
bool completed = false;
absl::Mutex mtx;
absl::CondVar cv;
auto callback = [&](const InvocationContext<QueryRouteResponse>* invocation_context) {
if (!invocation_context->status.ok()) {
SPDLOG_ERROR("Status not OK");
}
ASSERT_TRUE(invocation_context->status.ok());
EXPECT_TRUE(google::rpc::Code::OK == invocation_context->response.common().status().code());
EXPECT_FALSE(invocation_context->response.partitions().empty());
{
absl::MutexLock lk(&mtx);
cv.SignalAll();
}
};
invocation_context->callback = callback;
invocation_context->context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(3));
for (const auto& item : metadata_) {
invocation_context->context.AddMetadata(item.first, item.second);
}
client.asyncQueryRoute(request, invocation_context);
while (!completed) {
absl::MutexLock lk(&mtx);
cv.Wait(&mtx);
}
}
TEST_F(RpcClientTest, DISABLED_testSendMessageAsync) {
SendMessageRequest request;
fillSendMessageRequest(request);
auto client = brokerRpcClient();
auto context = new InvocationContext<SendMessageResponse>();
for (const auto& entry : metadata_) {
context->context.AddMetadata(entry.first, entry.second);
}
context->callback = [](const InvocationContext<SendMessageResponse>* invocation_context) {
if ((!invocation_context->status.ok())) {
std::cout << "error code: " << invocation_context->status.error_code()
<< ", error message: " << invocation_context->status.error_message() << std::endl;
}
ASSERT_TRUE(invocation_context->status.ok());
};
client->asyncSend(request, context);
std::thread th([&]() {
InvocationContext<SendMessageResponse>* ctx;
bool ok = false;
completion_queue_->Next(reinterpret_cast<void**>(&ctx), &ok);
if (ok) {
ctx->onCompletion(ok);
}
});
if (th.joinable()) {
th.join();
}
}
TEST_F(RpcClientTest, DISABLED_testHeartbeat) {
auto client = brokerRpcClient();
HeartbeatRequest heartbeat_request;
HeartbeatResponse response;
fillHeartbeatRequest(heartbeat_request);
absl::Mutex mtx;
absl::CondVar cv;
bool completed = false;
auto invocation_context = new InvocationContext<HeartbeatResponse>();
for (const auto& entry : metadata_) {
invocation_context->context.AddMetadata(entry.first, entry.second);
}
invocation_context->callback = [&](const InvocationContext<HeartbeatResponse>* invocation_context) {
ASSERT_TRUE(invocation_context->status.ok());
EXPECT_TRUE(google::rpc::Code::OK == invocation_context->response.common().status().code());
{
completed = true;
absl::MutexLock lk(&mtx);
cv.SignalAll();
}
};
client->asyncHeartbeat(heartbeat_request, invocation_context);
while (!completed) {
absl::MutexLock lk(&mtx);
cv.Wait(&mtx);
}
}
TEST_F(RpcClientTest, DISABLED_testQueryAssignment) {
auto client = brokerRpcClient();
QueryAssignmentRequest request;
request.mutable_topic()->set_name(topic_);
request.mutable_topic()->set_resource_namespace(resource_namespace_);
request.mutable_group()->set_resource_namespace(resource_namespace_);
request.mutable_group()->set_name(group_);
QueryAssignmentResponse response;
auto invocation_context = new InvocationContext<QueryAssignmentResponse>();
for (const auto& entry : metadata_) {
invocation_context->context.AddMetadata(entry.first, entry.second);
}
bool completed = false;
absl::Mutex mtx;
absl::CondVar cv;
auto callback = [&](const InvocationContext<QueryAssignmentResponse>* invocation_context) {
ASSERT_TRUE(invocation_context->status.ok());
ASSERT_FALSE(invocation_context->response.assignments().empty());
completed = true;
{
absl::MutexLock lk(&mtx);
cv.SignalAll();
}
};
invocation_context->callback = callback;
client->asyncQueryAssignment(request, invocation_context);
while (!completed) {
absl::MutexLock lk(&mtx);
cv.Wait(&mtx);
}
}
TEST_F(RpcClientTest, DISABLED_testHealthCheck) {
auto client = brokerRpcClient();
HealthCheckRequest request;
const std::string& client_host = UtilAll::hostname();
request.set_client_host(client_host);
HealthCheckResponse response;
auto invocation_context = new InvocationContext<HealthCheckResponse>();
for (const auto& entry : metadata_) {
invocation_context->context.AddMetadata(entry.first, entry.second);
}
absl::Mutex mtx;
absl::CondVar cv;
bool completed = false;
invocation_context->callback = [&](const InvocationContext<HealthCheckResponse>* invocation_context) {
{
absl::MutexLock lk(&mtx);
completed = true;
cv.SignalAll();
}
EXPECT_TRUE(invocation_context->status.ok());
EXPECT_EQ(google::rpc::Code::OK, invocation_context->response.common().status().code());
};
client->asyncHealthCheck(request, invocation_context);
while (!completed) {
absl::MutexLock lk(&mtx);
cv.Wait(&mtx);
}
}
ROCKETMQ_NAMESPACE_END