blob: 77d9a8c6ace0d79e39a23fa30b99e907da823d52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RpcClientMock.h"
#include "google/rpc/code.pb.h"
#include "grpcpp/impl/grpc_library.h"
#include "gtest/gtest.h"
ROCKETMQ_NAMESPACE_BEGIN
namespace ut {
class RpcClientTest : public testing::Test {
public:
void SetUp() override {
grpc::internal::GrpcLibraryInitializer initializer;
}
static void mockQueryRouteInfo(const QueryRouteRequest& request,
InvocationContext<QueryRouteResponse>* invocation_context) {
invocation_context->response.mutable_common()->mutable_status()->set_code(google::rpc::Code::OK);
for (int i = 0; i < 3; ++i) {
auto partition = new rmq::Partition;
partition->mutable_topic()->set_name(request.topic().name());
partition->mutable_broker()->set_name(fmt::format("broker-{}", i));
partition->mutable_broker()->set_id(0);
auto endpoint = partition->mutable_broker()->mutable_endpoints();
auto address = new rmq::Address;
address->set_host(fmt::format("10.0.0.{}", i));
address->set_port(10911);
endpoint->mutable_addresses()->AddAllocated(address);
invocation_context->response.mutable_partitions()->AddAllocated(partition);
}
invocation_context->onCompletion(true);
}
};
TEST_F(RpcClientTest, testMockedGetRouteInfo) {
RpcClientMock rpc_client_mock;
ON_CALL(rpc_client_mock, asyncQueryRoute(testing::_, testing::_)).WillByDefault(testing::Invoke(mockQueryRouteInfo));
std::string topic = "sample_topic";
QueryRouteRequest request;
request.mutable_topic()->set_name(topic);
absl::flat_hash_map<std::string, std::string> metadata;
auto invocation_context = new InvocationContext<QueryRouteResponse>();
absl::Mutex mtx;
absl::CondVar cv;
bool completed = false;
auto callback = [&](const InvocationContext<QueryRouteResponse>* invocation_context) {
EXPECT_TRUE(invocation_context->status.ok());
EXPECT_EQ(google::rpc::Code::OK, invocation_context->response.common().status().code());
EXPECT_EQ(3, invocation_context->response.partitions().size());
absl::MutexLock lk(&mtx);
completed = true;
cv.SignalAll();
};
invocation_context->callback = callback;
rpc_client_mock.asyncQueryRoute(request, invocation_context);
while (!completed) {
absl::MutexLock lk(&mtx);
cv.Wait(&mtx);
}
}
} // namespace ut
ROCKETMQ_NAMESPACE_END