blob: 93064cb7cc99dfbdc19295fcf48fe85f05bbe80e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <functional>
#include <memory>
#include "google/rpc/code.pb.h"
#include "grpc/grpc.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "ClientManagerFactory.h"
#include "ClientManagerImpl.h"
#include "RpcClientMock.h"
#include "StaticNameServerResolver.h"
ROCKETMQ_NAMESPACE_BEGIN
class MQClientTest : public testing::Test {
public:
MQClientTest() = default;
void SetUp() override {
grpc_init();
name_server_list_.emplace_back(name_server_address_);
client_instance_ = std::make_shared<ClientManagerImpl>(resource_namespace_);
rpc_client_ns_ = std::make_shared<testing::NiceMock<RpcClientMock>>();
ON_CALL(*rpc_client_ns_, needHeartbeat()).WillByDefault(testing::Return(false));
ON_CALL(*rpc_client_ns_, ok()).WillByDefault(testing::Invoke([this]() { return client_ok_; }));
ON_CALL(*rpc_client_ns_, asyncQueryRoute(testing::_, testing::_))
.WillByDefault(testing::Invoke(
std::bind(&MQClientTest::mockQueryRoute, this, std::placeholders::_1, std::placeholders::_2)));
client_instance_->addRpcClient(name_server_address_, rpc_client_ns_);
ClientManagerFactory::getInstance().addClientManager(resource_namespace_, client_instance_);
name_server_resolver_ = std::make_shared<StaticNameServerResolver>(name_server_address_);
}
void TearDown() override {
rpc_client_ns_.reset();
client_instance_->cleanRpcClients();
client_instance_.reset();
grpc_shutdown();
}
protected:
std::shared_ptr<ClientManagerImpl> client_instance_;
std::shared_ptr<testing::NiceMock<RpcClientMock>> rpc_client_ns_;
const int16_t port_{10911};
const int32_t partition_num_{24};
const int32_t avg_partition_per_host_{8};
std::string name_server_address_{"ipv4:127.0.0.1:9876"};
std::shared_ptr<NameServerResolver> name_server_resolver_;
std::string group_name_{"CID_Test"};
std::string topic_{"Topic_Test"};
std::string resource_namespace_{"mq://test"};
google::rpc::Code ok_{google::rpc::Code::OK};
bool client_ok_{true};
std::vector<std::string> name_server_list_;
private:
void mockQueryRoute(const QueryRouteRequest& request,
InvocationContext<QueryRouteResponse>* invocation_context) const {
invocation_context->response.mutable_common()->mutable_status()->set_code(google::rpc::Code::OK);
auto partitions = invocation_context->response.mutable_partitions();
for (int i = 0; i < partition_num_; i++) {
auto partition = new rmq::Partition();
partition->set_id(i % avg_partition_per_host_);
partition->set_permission(rmq::Permission::READ_WRITE);
partition->mutable_topic()->set_name(request.topic().name());
partition->mutable_topic()->set_resource_namespace(request.topic().resource_namespace());
std::string broker_name{"broker-"};
broker_name.push_back('a' + i / avg_partition_per_host_);
partition->mutable_broker()->set_name(broker_name);
auto endpoint = partition->mutable_broker()->mutable_endpoints();
endpoint->set_scheme(rmq::AddressScheme::IPv4);
auto addresses = endpoint->mutable_addresses();
auto address = new rmq::Address;
address->set_host(fmt::format("10.0.0.{}", i / avg_partition_per_host_));
address->set_port(port_);
addresses->AddAllocated(address);
partitions->AddAllocated(partition);
}
// Mock invoke callback.
invocation_context->onCompletion(true);
}
};
ROCKETMQ_NAMESPACE_END