blob: a5247ac81b74b12d911bce28cfffd8dc280a9504 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "rocketmq/DefaultMQProducer.h"
#include <memory>
#include <mutex>
#include <system_error>
#include <utility>
#include "MQClientTest.h"
#include "ProducerImpl.h"
#include "rocketmq/CredentialsProvider.h"
#include "rocketmq/MQSelector.h"
ROCKETMQ_NAMESPACE_BEGIN
class DefaultMQProducerUnitTest : public MQClientTest {
public:
DefaultMQProducerUnitTest() : MQClientTest() {
credentials_provider_ = std::make_shared<StaticCredentialsProvider>(access_key_, access_secret_);
}
void SetUp() override {
MQClientTest::SetUp();
// More set-up
rpc_client_broker_ = std::make_shared<testing::NiceMock<RpcClientMock>>();
ON_CALL(*rpc_client_broker_, needHeartbeat()).WillByDefault(testing::Return(true));
ON_CALL(*rpc_client_broker_, ok()).WillByDefault(testing::Invoke([this]() { return client_ok_; }));
ON_CALL(*rpc_client_broker_, asyncHealthCheck)
.WillByDefault(testing::Invoke(
[this](const HealthCheckRequest& request, InvocationContext<HealthCheckResponse>* invocation_context) {
invocation_context->response.mutable_common()->mutable_status()->set_code(ok_);
invocation_context->onCompletion(true);
}));
ON_CALL(*rpc_client_broker_, asyncHeartbeat(testing::_, testing::_))
.WillByDefault(testing::Invoke(
[this](const HeartbeatRequest& request, InvocationContext<HeartbeatResponse>* invocation_context) {
invocation_context->response.mutable_common()->mutable_status()->set_code(ok_);
invocation_context->onCompletion(true);
}));
ON_CALL(*rpc_client_broker_, asyncSend)
.WillByDefault(testing::Invoke(
[this](const SendMessageRequest& request, InvocationContext<SendMessageResponse>* invocation_context) {
invocation_context->response.mutable_common()->mutable_status()->set_code(ok_);
invocation_context->response.set_message_id(message_id_);
invocation_context->onCompletion(true);
}));
const char* address_format = "ipv4:10.0.0.{}:10911";
for (int i = 0; i < partition_num_ / avg_partition_per_host_; ++i) {
std::string&& address = fmt::format(address_format, i);
client_instance_->addRpcClient(address, rpc_client_broker_);
}
}
void TearDown() override {
rpc_client_broker_.reset();
MQClientTest::TearDown();
}
protected:
absl::flat_hash_map<std::string, std::string> metadata_;
std::shared_ptr<testing::NiceMock<RpcClientMock>> rpc_client_broker_;
std::string message_id_{"msg_id_0"};
std::string access_key_{"access_key"};
std::string access_secret_{"access_secret"};
std::shared_ptr<CredentialsProvider> credentials_provider_;
std::string body_{"Test message body"};
};
TEST_F(DefaultMQProducerUnitTest, testBasicSetUp) {
QueryRouteRequest request;
absl::Mutex mtx;
absl::CondVar cv;
bool completed = false;
auto invocation_context = new InvocationContext<QueryRouteResponse>();
auto callback = [&](const InvocationContext<QueryRouteResponse>* invocation_context) {
ASSERT_TRUE(invocation_context->status.ok());
absl::MutexLock lk(&mtx);
completed = true;
cv.SignalAll();
};
invocation_context->callback = callback;
rpc_client_ns_->asyncQueryRoute(request, invocation_context);
while (!completed) {
absl::MutexLock lk(&mtx);
cv.Wait(&mtx);
}
completed = false;
SendMessageRequest send_message_request;
auto send_message_invocation_context = new InvocationContext<SendMessageResponse>();
auto send_callback = [&](const InvocationContext<SendMessageResponse>* invocation_context) {
ASSERT_TRUE(invocation_context->response.message_id() == message_id_);
absl::MutexLock lk(&mtx);
completed = true;
cv.SignalAll();
};
send_message_invocation_context->callback = send_callback;
rpc_client_broker_->asyncSend(send_message_request, send_message_invocation_context);
while (!completed) {
absl::MutexLock lk(&mtx);
cv.Wait(&mtx);
}
}
class UnitTestSendCallback : public SendCallback {
public:
UnitTestSendCallback(absl::Mutex& mtx, absl::CondVar& cv, std::string& msg_id, bool& completed)
: mtx_(mtx), cv_(cv), msg_id_(msg_id), completed_(completed) {
}
void onSuccess(SendResult& send_result) noexcept override {
absl::MutexLock lk(&mtx_);
msg_id_ = send_result.getMsgId();
completed_ = true;
cv_.SignalAll();
}
void onFailure(const std::error_code& e) noexcept override {
absl::MutexLock lk(&mtx_);
completed_ = true;
cv_.SignalAll();
}
private:
absl::Mutex& mtx_;
absl::CondVar& cv_;
std::string& msg_id_;
bool& completed_;
};
TEST_F(DefaultMQProducerUnitTest, testAsyncSendMessage) {
auto producer = std::make_shared<ProducerImpl>(group_name_);
producer->resourceNamespace(resource_namespace_);
producer->withNameServerResolver(name_server_resolver_);
producer->setCredentialsProvider(credentials_provider_);
producer->start();
MQMessage message;
message.setTopic(topic_);
message.setBody(body_);
absl::Mutex mtx;
absl::CondVar cv;
bool completed = false;
std::string msg_id;
auto send_callback = new UnitTestSendCallback(mtx, cv, msg_id, completed);
producer->send(message, send_callback);
if (!completed) {
absl::MutexLock lk(&mtx);
cv.WaitWithTimeout(&mtx, absl::Seconds(10));
}
ASSERT_EQ(msg_id, message_id_);
producer->shutdown();
}
TEST_F(DefaultMQProducerUnitTest, testSendMessage) {
auto producer = std::make_shared<ProducerImpl>(group_name_);
producer->resourceNamespace(resource_namespace_);
producer->withNameServerResolver(name_server_resolver_);
producer->setCredentialsProvider(credentials_provider_);
producer->start();
MQMessage message;
message.setTopic(topic_);
message.setBody(body_);
std::error_code ec;
SendResult send_result = producer->send(message, ec);
EXPECT_FALSE(ec);
ASSERT_EQ(send_result.getMsgId(), message_id_);
producer->shutdown();
}
TEST_F(DefaultMQProducerUnitTest, testEndpointIsolation) {
auto producer = std::make_shared<ProducerImpl>(group_name_);
producer->resourceNamespace(resource_namespace_);
producer->withNameServerResolver(name_server_resolver_);
producer->setCredentialsProvider(credentials_provider_);
producer->start();
const char* isolated_endpoint = "ipv4:10.0.0.0:10911";
producer->isolateEndpoint(isolated_endpoint);
absl::SleepFor(absl::Seconds(10));
ASSERT_FALSE(producer->isEndpointIsolated(isolated_endpoint));
producer->shutdown();
}
ROCKETMQ_NAMESPACE_END