blob: 1f49725dc4b260abb441a52f96c62194ee55be06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TopicPublishInfo.h"
#include "ClientConfigImpl.h"
#include "LogInterceptorFactory.h"
#include "RpcClientImpl.h"
#include "Signature.h"
#include "TlsHelper.h"
#include "grpcpp/security/tls_credentials_options.h"
#include "rocketmq/MQMessageQueue.h"
#include "gtest/gtest.h"
ROCKETMQ_NAMESPACE_BEGIN
class TopicPublishInfoTest : public ::testing::Test {
protected:
TopicPublishInfoTest() : completion_queue_(std::make_shared<grpc::CompletionQueue>()) {
server_authorization_check_config_ = std::make_shared<grpc::experimental::TlsServerAuthorizationCheckConfig>(
std::make_shared<TlsServerAuthorizationChecker>());
tls_channel_credential_option_.set_server_verification_option(GRPC_TLS_SKIP_HOSTNAME_VERIFICATION);
std::vector<grpc::experimental::IdentityKeyCertPair> pem_list;
grpc::experimental::IdentityKeyCertPair pair{};
pair.private_key = TlsHelper::client_private_key;
pair.certificate_chain = TlsHelper::client_certificate_chain;
pem_list.emplace_back(pair);
certificate_provider_ =
std::make_shared<grpc::experimental::StaticDataCertificateProvider>(TlsHelper::CA, pem_list);
tls_channel_credential_option_.set_certificate_provider(certificate_provider_);
tls_channel_credential_option_.set_server_authorization_check_config(server_authorization_check_config_);
tls_channel_credential_option_.watch_root_certs();
tls_channel_credential_option_.watch_identity_key_cert_pairs();
channel_credential_ = grpc::experimental::TlsCredentials(tls_channel_credential_option_);
credentials_provider_ = std::make_shared<ConfigFileCredentialsProvider>();
}
void SetUp() override {
std::vector<std::string> name_server_list;
if (!name_server_list.empty()) {
target_ = *name_server_list.begin();
}
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
client_config_.tenantId(tenant_id_);
client_config_.setCredentialsProvider(credentials_provider_);
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
target_, channel_credential_, channel_arguments_, std::move(interceptor_factories));
client_ = std::make_shared<rocketmq::RpcClientImpl>(completion_queue_, channel);
client_config_.resourceNamespace(resource_namespace_);
client_config_.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
client_config_.setIoTimeout(absl::Seconds(3));
Signature::sign(&client_config_, metadata_);
}
void TearDown() override {
completion_queue_->Shutdown();
}
std::string topic_{"yc001"};
std::string group_{"yc001"};
std::string resource_namespace_{"MQ_INST_1973281269661160_BXmPlOA6"};
std::string tenant_id_{"sample-tenant"};
std::string region_id_{"cn-hangzhou"};
std::string service_name_{"MQ"};
std::string target_{"dns:grpc.dev:9876"};
ClientConfigImpl client_config_{group_};
absl::flat_hash_map<std::string, std::string> metadata_;
std::shared_ptr<grpc::CompletionQueue> completion_queue_;
std::shared_ptr<RpcClientImpl> client_;
CredentialsProviderPtr credentials_provider_;
std::shared_ptr<grpc::experimental::CertificateProviderInterface> certificate_provider_;
grpc::experimental::TlsChannelCredentialsOptions tls_channel_credential_option_;
std::shared_ptr<grpc::experimental::TlsServerAuthorizationCheckConfig> server_authorization_check_config_;
std::shared_ptr<grpc::ChannelCredentials> channel_credential_;
grpc::ChannelArguments channel_arguments_;
};
TEST_F(TopicPublishInfoTest, testTopicPublishInfo) {
rmq::QueryRouteResponse response;
rmq::QueryRouteRequest request;
request.mutable_topic()->set_resource_namespace(resource_namespace_);
request.mutable_topic()->set_name(topic_);
auto invocation_context = new InvocationContext<QueryRouteResponse>();
invocation_context->context.set_deadline(std::chrono::system_clock::now() +
absl::ToChronoMilliseconds(client_config_.getIoTimeout()));
for (const auto& item : metadata_) {
invocation_context->context.AddMetadata(item.first, item.second);
}
auto callback = [this](const InvocationContext<QueryRouteResponse>* invocation_context) {
if (!invocation_context->status.ok()) {
std::cout << "code: " << invocation_context->status.error_code()
<< ", message: " << invocation_context->status.error_message() << std::endl;
} else {
std::cout << "Response debug string:" << invocation_context->response.DebugString() << std::endl;
}
ASSERT_TRUE(invocation_context->status.ok());
std::vector<Partition> partitions;
for (const auto& item : invocation_context->response.partitions()) {
Topic topic(resource_namespace_, topic_);
Permission permission;
switch (item.permission()) {
case rmq::Permission::READ:
permission = Permission::READ;
break;
case rmq::Permission::WRITE:
permission = Permission::WRITE;
break;
case rmq::Permission::READ_WRITE:
permission = Permission::READ_WRITE;
break;
default:
permission = Permission::NONE;
break;
}
AddressScheme scheme;
switch (item.broker().endpoints().scheme()) {
case rmq::AddressScheme::IPv4:
scheme = AddressScheme::IPv4;
break;
case rmq::AddressScheme::IPv6:
scheme = AddressScheme::IPv6;
break;
case rmq::AddressScheme::DOMAIN_NAME:
scheme = AddressScheme::DOMAIN_NAME;
break;
default:
scheme = AddressScheme::IPv4;
}
std::vector<Address> addresses;
for (const auto& host_port : item.broker().endpoints().addresses()) {
Address address(host_port.host(), host_port.port());
addresses.emplace_back(address);
}
ServiceAddress service_address(scheme, addresses);
Broker broker(item.broker().name(), item.broker().id(), service_address);
Partition partition(topic, item.id(), permission, broker);
partitions.emplace_back(partition);
}
auto topic_route_data =
std::make_shared<TopicRouteData>(partitions, invocation_context->response.common().DebugString());
rocketmq::TopicPublishInfo topic_publish_info(topic_, topic_route_data);
rocketmq::MQMessageQueue message_queue;
EXPECT_TRUE(topic_publish_info.selectOneMessageQueue(message_queue));
EXPECT_STREQ(topic_.c_str(), message_queue.getTopic().c_str());
};
invocation_context->callback = callback;
client_->asyncQueryRoute(request, invocation_context);
}
ROCKETMQ_NAMESPACE_END