blob: 5322f66e5a74553cdd916042aaefdfa96fa66a43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <system_error>
#include "ClientManagerFactory.h"
#include "ClientManagerMock.h"
#include "ProducerImpl.h"
#include "SchedulerImpl.h"
#include "StaticNameServerResolver.h"
#include "TopicRouteData.h"
#include "rocketmq/AsyncCallback.h"
#include "rocketmq/MQMessage.h"
#include "rocketmq/MQSelector.h"
#include "rocketmq/RocketMQ.h"
#include "rocketmq/SendResult.h"
ROCKETMQ_NAMESPACE_BEGIN
class ProducerImplTest : public testing::Test {
public:
ProducerImplTest() : credentials_provider_(std::make_shared<StaticCredentialsProvider>(access_key_, access_secret_)) {
}
void SetUp() override {
grpc_init();
name_server_resolver_ = std::make_shared<StaticNameServerResolver>(name_server_list_);
client_manager_ = std::make_shared<testing::NiceMock<ClientManagerMock>>();
ClientManagerFactory::getInstance().addClientManager(resource_namespace_, client_manager_);
producer_ = std::make_shared<ProducerImpl>(group_);
producer_->resourceNamespace(resource_namespace_);
producer_->withNameServerResolver(name_server_resolver_);
producer_->setCredentialsProvider(credentials_provider_);
{
std::vector<Partition> partitions;
Topic topic(resource_namespace_, topic_);
std::vector<Address> broker_addresses{Address(broker_host_, broker_port_)};
ServiceAddress service_address(AddressScheme::IPv4, broker_addresses);
Broker broker(broker_name_, broker_id_, service_address);
Partition partition(topic, queue_id_, Permission::READ_WRITE, broker);
partitions.emplace_back(partition);
std::string debug_string;
topic_route_data_ = std::make_shared<TopicRouteData>(partitions, debug_string);
}
}
void TearDown() override {
grpc_shutdown();
}
protected:
std::shared_ptr<testing::NiceMock<ClientManagerMock>> client_manager_;
std::shared_ptr<ProducerImpl> producer_;
std::string name_server_list_{"10.0.0.1:9876"};
std::shared_ptr<NameServerResolver> name_server_resolver_;
std::string resource_namespace_{"mq://test"};
std::string group_{"CID_test"};
std::string topic_{"Topic0"};
int queue_id_{1};
std::string tag_{"TagA"};
std::string key_{"key-0"};
std::string message_group_{"group-0"};
std::string broker_name_{"broker-a"};
int broker_id_{0};
std::string message_body_{"Message Body Content"};
std::string broker_host_{"10.0.0.1"};
int broker_port_{10911};
TopicRouteDataPtr topic_route_data_;
std::string access_key_{"access_key"};
std::string access_secret_{"access_secret"};
std::shared_ptr<CredentialsProvider> credentials_provider_;
};
TEST_F(ProducerImplTest, testStartShutdown) {
SchedulerImpl scheduler;
scheduler.start();
ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
producer_->start();
producer_->shutdown();
scheduler.shutdown();
}
TEST_F(ProducerImplTest, testSend) {
SchedulerImpl scheduler;
scheduler.start();
ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
auto mock_resolve_route =
[this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code& ec, const TopicRouteDataPtr& ptr)>& cb) {
std::error_code ec;
cb(ec, topic_route_data_);
};
EXPECT_CALL(*client_manager_, resolveRoute)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Invoke(mock_resolve_route));
bool cb_invoked = false;
SendResult send_result;
auto mock_send = [&](const std::string& target_host, const Metadata& metadata, SendMessageRequest& request,
SendCallback* cb) {
cb->onSuccess(send_result);
cb_invoked = true;
return true;
};
EXPECT_CALL(*client_manager_, send).Times(testing::AtLeast(1)).WillRepeatedly(testing::Invoke(mock_send));
producer_->start();
MQMessage message(topic_, tag_, message_body_);
std::error_code ec;
producer_->send(message, ec);
EXPECT_FALSE(ec);
EXPECT_TRUE(cb_invoked);
producer_->shutdown();
scheduler.shutdown();
}
TEST_F(ProducerImplTest, testSend_WithMessageGroup) {
SchedulerImpl scheduler;
scheduler.start();
ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
auto mock_resolve_route =
[this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code& ec, const TopicRouteDataPtr& ptr)>& cb) {
std::error_code ec;
cb(ec, topic_route_data_);
};
EXPECT_CALL(*client_manager_, resolveRoute)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Invoke(mock_resolve_route));
bool cb_invoked = false;
SendResult send_result;
auto mock_send = [&](const std::string& target_host, const Metadata& metadata, SendMessageRequest& request,
SendCallback* cb) {
cb->onSuccess(send_result);
cb_invoked = true;
return true;
};
EXPECT_CALL(*client_manager_, send).Times(testing::AtLeast(1)).WillRepeatedly(testing::Invoke(mock_send));
producer_->start();
MQMessage message(topic_, tag_, message_body_);
message.bindMessageGroup(message_group_);
std::error_code ec;
producer_->send(message, ec);
EXPECT_FALSE(ec);
EXPECT_TRUE(cb_invoked);
producer_->shutdown();
scheduler.shutdown();
}
TEST_F(ProducerImplTest, testSend_WithMessageQueueSelector) {
SchedulerImpl scheduler;
scheduler.start();
ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
auto mock_resolve_route =
[this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code& ec, const TopicRouteDataPtr& ptr)>& cb) {
std::error_code ec;
cb(ec, topic_route_data_);
};
EXPECT_CALL(*client_manager_, resolveRoute)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Invoke(mock_resolve_route));
bool cb_invoked = false;
SendResult send_result;
auto mock_send = [&](const std::string& target_host, const Metadata& metadata, SendMessageRequest& request,
SendCallback* cb) {
cb->onSuccess(send_result);
cb_invoked = true;
return true;
};
EXPECT_CALL(*client_manager_, send).Times(testing::AtLeast(1)).WillRepeatedly(testing::Invoke(mock_send));
producer_->start();
MQMessage message(topic_, tag_, message_body_);
std::error_code ec;
auto&& list = producer_->listMessageQueue(topic_, ec);
EXPECT_FALSE(list.empty());
message.bindMessageQueue(list[0]);
producer_->send(message, ec);
EXPECT_TRUE(cb_invoked);
producer_->shutdown();
scheduler.shutdown();
}
class TestSendCallback : public SendCallback {
public:
TestSendCallback(bool& completed, absl::Mutex& mtx, absl::CondVar& cv) : completed_(completed), mtx_(mtx), cv_(cv) {
}
void onSuccess(SendResult& send_result) noexcept override {
absl::MutexLock lk(&mtx_);
completed_ = true;
cv_.SignalAll();
}
void onFailure(const std::error_code& ec) noexcept override {
absl::MutexLock lk(&mtx_);
completed_ = true;
cv_.SignalAll();
}
protected:
bool& completed_;
absl::Mutex& mtx_;
absl::CondVar& cv_;
};
TEST_F(ProducerImplTest, testAsyncSend) {
SchedulerImpl scheduler;
scheduler.start();
ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
auto mock_resolve_route =
[this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code& ec, const TopicRouteDataPtr& ptr)>& cb) {
std::error_code ec;
cb(ec, topic_route_data_);
};
EXPECT_CALL(*client_manager_, resolveRoute)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Invoke(mock_resolve_route));
bool cb_invoked = false;
SendResult send_result;
auto mock_send = [&](const std::string& target_host, const Metadata& metadata, SendMessageRequest& request,
SendCallback* cb) {
cb->onSuccess(send_result);
cb_invoked = true;
return true;
};
EXPECT_CALL(*client_manager_, send).Times(testing::AtLeast(1)).WillRepeatedly(testing::Invoke(mock_send));
bool completed = false;
absl::Mutex mtx;
absl::CondVar cv;
auto send_callback = absl::make_unique<TestSendCallback>(completed, mtx, cv);
producer_->start();
MQMessage message(topic_, tag_, message_body_);
producer_->send(message, send_callback.get());
if (!completed) {
absl::MutexLock lk(&mtx);
cv.WaitWithDeadline(&mtx, absl::Now() + absl::Seconds(3));
}
EXPECT_TRUE(cb_invoked);
scheduler.shutdown();
producer_->shutdown();
}
ROCKETMQ_NAMESPACE_END