blob: dea0f5aba967757625aa5d89d328a41a6426dcad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <system_error>
#include <vector>
#include "gtest/gtest.h"
#include "Assignment.h"
#include "ClientManagerMock.h"
#include "ConsumeMessageServiceMock.h"
#include "ConsumeMessageType.h"
#include "InvocationContext.h"
#include "MessageAccessor.h"
#include "ProcessQueueImpl.h"
#include "PushConsumerMock.h"
#include "ReceiveMessageCallbackMock.h"
#include "ReceiveMessageResult.h"
#include "RpcClientMock.h"
#include "rocketmq/CredentialsProvider.h"
#include "rocketmq/MQMessageExt.h"
ROCKETMQ_NAMESPACE_BEGIN
class ProcessQueueTest : public testing::Test {
public:
void SetUp() override {
rpc_client_ = std::make_shared<testing::NiceMock<RpcClientMock>>();
message_queue_.serviceAddress(service_address_);
message_queue_.setTopic(topic_);
message_queue_.setBrokerName(broker_name_);
message_queue_.setQueueId(queue_id_);
client_manager_ = std::make_shared<testing::NiceMock<ClientManagerMock>>();
credentials_provider_ = std::make_shared<StaticCredentialsProvider>(access_key_, access_secret_);
consumer_ = std::make_shared<testing::NiceMock<PushConsumerMock>>();
auto consumer = std::dynamic_pointer_cast<PushConsumer>(consumer_);
consume_message_service_ = std::make_shared<testing::NiceMock<ConsumeMessageServiceMock>>();
ON_CALL(*consume_message_service_, messageListenerType)
.WillByDefault(testing::Return(MessageListenerType::STANDARD));
ON_CALL(*consumer_, getConsumeMessageService).WillByDefault(testing::Return(consume_message_service_));
process_queue_ = std::make_shared<ProcessQueueImpl>(message_queue_, filter_expression_, consumer, client_manager_);
receive_message_callback_ = std::make_shared<testing::NiceMock<ReceiveMessageCallbackMock>>();
process_queue_->callback(receive_message_callback_);
}
void TearDown() override {
}
protected:
std::string tenant_id_{"tenant-0"};
std::string access_key_{"ak"};
std::string access_secret_{"secret"};
std::shared_ptr<CredentialsProvider> credentials_provider_;
std::string group_name_{"TestGroup"};
std::string client_id_{"Client-0"};
std::string broker_name_{"broker-a"};
std::string region_{"cn-hangzhou"};
std::string service_name_{"MQ"};
int queue_id_{0};
std::string topic_{"TestTopic"};
std::string service_address_{"ipv4:10.0.0.1:10911"};
std::string tag_{"TagA"};
FilterExpression filter_expression_{tag_};
MQMessageQueue message_queue_;
std::shared_ptr<testing::NiceMock<RpcClientMock>> rpc_client_;
std::shared_ptr<testing::NiceMock<ClientManagerMock>> client_manager_;
std::shared_ptr<testing::NiceMock<PushConsumerMock>> consumer_;
std::shared_ptr<testing::NiceMock<ConsumeMessageServiceMock>> consume_message_service_;
std::shared_ptr<ProcessQueueImpl> process_queue_;
std::shared_ptr<testing::NiceMock<ReceiveMessageCallbackMock>> receive_message_callback_;
std::string resource_namespace_{"mq://test"};
std::string message_body_{"Sample body"};
uint32_t threshold_quantity_{32};
uint64_t threshold_memory_{4096};
uint32_t consume_batch_size_{8};
};
TEST_F(ProcessQueueTest, testBind) {
EXPECT_TRUE(process_queue_->bindFifoConsumeTask());
EXPECT_FALSE(process_queue_->bindFifoConsumeTask());
EXPECT_TRUE(process_queue_->unbindFifoConsumeTask());
EXPECT_FALSE(process_queue_->unbindFifoConsumeTask());
EXPECT_TRUE(process_queue_->bindFifoConsumeTask());
}
TEST_F(ProcessQueueTest, testExpired) {
EXPECT_FALSE(process_queue_->expired());
process_queue_->idle_since_ -= MixAll::PROCESS_QUEUE_EXPIRATION_THRESHOLD_;
EXPECT_TRUE(process_queue_->expired());
}
TEST_F(ProcessQueueTest, testShouldThrottle) {
EXPECT_CALL(*consumer_, maxCachedMessageQuantity)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(threshold_quantity_));
EXPECT_CALL(*consumer_, maxCachedMessageMemory)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(threshold_memory_));
EXPECT_FALSE(process_queue_->shouldThrottle());
}
TEST_F(ProcessQueueTest, testShouldThrottle_ByQuantity) {
std::vector<MQMessageExt> messages;
for (uint32_t i = 0; i < threshold_quantity_; i++) {
MQMessageExt message;
message.setTopic(topic_);
message.setTags(tag_);
MessageAccessor::setQueueId(message, 0);
MessageAccessor::setQueueOffset(message, i);
message.setBody(std::to_string(i));
messages.emplace_back(message);
}
process_queue_->cacheMessages(messages);
EXPECT_CALL(*consumer_, maxCachedMessageQuantity)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(threshold_quantity_));
EXPECT_CALL(*consumer_, maxCachedMessageMemory)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(threshold_memory_));
EXPECT_TRUE(process_queue_->shouldThrottle());
}
TEST_F(ProcessQueueTest, testShouldThrottle_ByMemory) {
std::vector<MQMessageExt> messages;
size_t body_length = 1024 * 4;
for (uint32_t i = 0; i < threshold_quantity_ / 2; i++) {
MQMessageExt message;
message.setTopic(topic_);
message.setTags(tag_);
MessageAccessor::setQueueId(message, 0);
MessageAccessor::setQueueOffset(message, i);
message.setBody(std::string(body_length, 'c'));
messages.emplace_back(message);
}
process_queue_->cacheMessages(messages);
EXPECT_CALL(*consumer_, maxCachedMessageQuantity)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(threshold_quantity_));
EXPECT_CALL(*consumer_, maxCachedMessageMemory)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(threshold_memory_));
EXPECT_TRUE(process_queue_->shouldThrottle());
}
TEST_F(ProcessQueueTest, testHasPendingMessages) {
EXPECT_FALSE(process_queue_->hasPendingMessages());
}
TEST_F(ProcessQueueTest, testHasPendingMessages2) {
std::vector<MQMessageExt> messages;
size_t body_length = 1024;
for (size_t i = 0; i < threshold_quantity_; i++) {
MQMessageExt message;
message.setTopic(topic_);
message.setTags(tag_);
MessageAccessor::setQueueId(message, 0);
MessageAccessor::setQueueOffset(message, i);
message.setBody(std::string(body_length, 'c'));
messages.emplace_back(message);
}
process_queue_->cacheMessages(messages);
EXPECT_TRUE(process_queue_->hasPendingMessages());
}
TEST_F(ProcessQueueTest, testTake) {
std::vector<MQMessageExt> messages;
EXPECT_FALSE(process_queue_->take(consume_batch_size_, messages));
EXPECT_TRUE(messages.empty());
}
TEST_F(ProcessQueueTest, testTake2) {
{
std::vector<MQMessageExt> messages;
size_t body_length = 1024;
for (size_t i = 0; i < threshold_quantity_; i++) {
MQMessageExt message;
message.setTopic(topic_);
message.setTags(tag_);
MessageAccessor::setQueueId(message, 0);
MessageAccessor::setQueueOffset(message, i);
message.setBody(std::string(body_length, 'c'));
messages.emplace_back(message);
}
process_queue_->cacheMessages(messages);
EXPECT_EQ(threshold_quantity_, process_queue_->cachedMessagesSize());
}
std::vector<MQMessageExt> msgs;
EXPECT_TRUE(process_queue_->take(consume_batch_size_, msgs));
EXPECT_FALSE(msgs.empty());
EXPECT_EQ(tag_, msgs.begin()->getTags());
EXPECT_EQ(topic_, msgs.begin()->getTopic());
EXPECT_EQ(threshold_quantity_ - consume_batch_size_, process_queue_->cachedMessagesSize());
}
TEST_F(ProcessQueueTest, testRelease) {
EXPECT_CALL(*consumer_, messageModel)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(MessageModel::BROADCASTING));
int64_t offset;
EXPECT_FALSE(process_queue_->committedOffset(offset));
size_t body_length = 1024;
{
std::vector<MQMessageExt> messages;
for (size_t i = 0; i < threshold_quantity_; i++) {
MQMessageExt message;
message.setTopic(topic_);
message.setTags(tag_);
MessageAccessor::setQueueId(message, 0);
MessageAccessor::setQueueOffset(message, i);
message.setBody(std::string(body_length, 'c'));
messages.emplace_back(message);
}
process_queue_->cacheMessages(messages);
EXPECT_EQ(threshold_quantity_, process_queue_->cachedMessagesSize());
}
std::vector<MQMessageExt> msgs;
process_queue_->take(1, msgs);
EXPECT_TRUE(process_queue_->committedOffset(offset));
EXPECT_EQ(0, offset);
process_queue_->release(body_length, 0);
EXPECT_TRUE(process_queue_->committedOffset(offset));
EXPECT_EQ(1, offset);
}
TEST_F(ProcessQueueTest, testOffset) {
EXPECT_CALL(*consumer_, messageModel)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(MessageModel::BROADCASTING));
int64_t offset;
EXPECT_FALSE(process_queue_->committedOffset(offset));
size_t body_length = 1024;
{
std::vector<MQMessageExt> messages;
for (size_t i = 0; i < threshold_quantity_; i++) {
MQMessageExt message;
message.setTopic(topic_);
message.setTags(tag_);
MessageAccessor::setQueueId(message, 0);
MessageAccessor::setQueueOffset(message, i);
message.setBody(std::string(body_length, 'c'));
messages.emplace_back(message);
}
process_queue_->cacheMessages(messages);
EXPECT_EQ(threshold_quantity_, process_queue_->cachedMessagesSize());
}
std::vector<MQMessageExt> msgs;
process_queue_->take(threshold_quantity_, msgs);
EXPECT_TRUE(process_queue_->committedOffset(offset));
EXPECT_EQ(0, offset);
for (size_t i = 0; i < threshold_quantity_; i++) {
process_queue_->release(body_length, i);
}
EXPECT_TRUE(process_queue_->committedOffset(offset));
EXPECT_EQ(threshold_quantity_, offset);
}
TEST_F(ProcessQueueTest, testReceiveMessage_POP) {
EXPECT_CALL(*consumer_, tenantId).WillRepeatedly(testing::ReturnRef(tenant_id_));
EXPECT_CALL(*consumer_, resourceNamespace).WillRepeatedly(testing::ReturnRef(resource_namespace_));
EXPECT_CALL(*consumer_, credentialsProvider).WillRepeatedly(testing::Return(credentials_provider_));
EXPECT_CALL(*consumer_, region).WillRepeatedly(testing::ReturnRef(region_));
EXPECT_CALL(*consumer_, serviceName).WillRepeatedly(testing::ReturnRef(service_name_));
EXPECT_CALL(*consumer_, clientId).WillRepeatedly(testing::Return(client_id_));
EXPECT_CALL(*consumer_, getGroupName).WillRepeatedly(testing::ReturnRef(group_name_));
EXPECT_CALL(*consumer_, getLongPollingTimeout).WillRepeatedly(testing::Return(absl::Seconds(3)));
auto optional = absl::make_optional(filter_expression_);
EXPECT_CALL(*consumer_, getFilterExpression).WillRepeatedly(testing::Return(optional));
EXPECT_CALL(*consumer_, receiveBatchSize).WillRepeatedly(testing::Return(threshold_quantity_));
EXPECT_CALL(*consumer_, messageModel).WillRepeatedly(testing::Return(MessageModel::CLUSTERING));
auto receive_message_mock = [this](const std::string& target, const Metadata& metadata,
const ReceiveMessageRequest& request, std::chrono::milliseconds timeout,
const std::shared_ptr<ReceiveMessageCallback>& cb) {
std::error_code ec;
ReceiveMessageResult result;
for (size_t i = 0; i < threshold_quantity_; i++) {
MQMessageExt message;
message.setTopic(topic_);
message.setTags(tag_);
message.setBody(message_body_);
MessageAccessor::setQueueId(message, queue_id_);
MessageAccessor::setQueueOffset(message, i);
result.messages.emplace_back(message);
}
cb->onCompletion(ec, result);
};
EXPECT_CALL(*client_manager_, receiveMessage)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Invoke(receive_message_mock));
process_queue_->receiveMessage();
}
TEST_F(ProcessQueueTest, testReceiveMessage_Pull) {
EXPECT_CALL(*consumer_, tenantId).WillRepeatedly(testing::ReturnRef(tenant_id_));
EXPECT_CALL(*consumer_, resourceNamespace).WillRepeatedly(testing::ReturnRef(resource_namespace_));
EXPECT_CALL(*consumer_, credentialsProvider).WillRepeatedly(testing::Return(credentials_provider_));
EXPECT_CALL(*consumer_, region).WillRepeatedly(testing::ReturnRef(region_));
EXPECT_CALL(*consumer_, serviceName).WillRepeatedly(testing::ReturnRef(service_name_));
EXPECT_CALL(*consumer_, clientId).WillRepeatedly(testing::Return(client_id_));
EXPECT_CALL(*consumer_, getGroupName).WillRepeatedly(testing::ReturnRef(group_name_));
EXPECT_CALL(*consumer_, getLongPollingTimeout).WillRepeatedly(testing::Return(absl::Seconds(3)));
auto optional = absl::make_optional(filter_expression_);
EXPECT_CALL(*consumer_, getFilterExpression).WillRepeatedly(testing::Return(optional));
EXPECT_CALL(*consumer_, receiveBatchSize).WillRepeatedly(testing::Return(threshold_quantity_));
std::error_code ec;
ReceiveMessageResult result;
auto pull_message_mock = [&](const std::string& target_host, const Metadata& metadata,
const PullMessageRequest& request, std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const ReceiveMessageResult&)>& cb) {
cb(ec, result);
};
EXPECT_CALL(*client_manager_, pullMessage)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Invoke(pull_message_mock));
process_queue_->receiveMessage();
}
ROCKETMQ_NAMESPACE_END