blob: 1b0334ae796a63e9be42b6f1873e01a62b31a898 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "utils/VerifyUtils.h"
#include "utils/data/collect/DataCollector.h"
#include "utils/data/collect/DataCollectorManager.h"
#include "utils/SimpleConcurrentHashMapUtils.h"
#include "gtest/gtest.h"
#include "resource/Resource.h"
#include "spdlog/logger.h"
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <future>
#include <chrono>
#include <thread>
#include <atomic>
extern std::shared_ptr<spdlog::logger> multi_logger;
extern std::shared_ptr<Resource> resource;
std::atomic<int> VerifyUtils::receivedIndex(0);
long long VerifyUtils::getDelayTime(int delayLevel)
{
long long delayTime = 0;
switch (delayLevel)
{
case 1:
delayTime = 1 * 1000;
break;
case 2:
delayTime = 5 * 1000;
break;
case 3:
delayTime = 10 * 1000;
break;
case 4:
delayTime = 30 * 1000;
break;
case 5:
delayTime = 1 * 60 * 1000;
break;
case 6:
delayTime = 2 * 60 * 1000;
break;
case 7:
delayTime = 3 * 60 * 1000;
break;
case 8:
delayTime = 4 * 60 * 1000;
break;
case 9:
delayTime = 5 * 60 * 1000;
break;
case 10:
delayTime = 6 * 60 * 1000;
break;
case 11:
delayTime = 7 * 60 * 1000;
break;
case 12:
delayTime = 8 * 60 * 1000;
break;
case 13:
delayTime = 9 * 60 * 1000;
break;
case 14:
delayTime = 10 * 60 * 1000;
break;
case 15:
delayTime = 20 * 60 * 1000;
break;
case 16:
delayTime = 30 * 60 * 1000;
break;
case 17:
delayTime = 1 * 60 * 60 * 1000;
break;
case 18:
delayTime = 2 * 60 * 60 * 1000;
break;
}
return delayTime;
}
std::unordered_map<std::string, long> VerifyUtils::checkDelay(DataCollector<MQMsg> &dequeueMessages, int delayLevel)
{
std::unordered_map<std::string, long> map;
std::vector<MQMsg> receivedMessages = dequeueMessages.getAllData();
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
// 将时间点转换为时间戳(以毫秒为单位)
std::chrono::seconds duration = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
long consumeTime = duration.count() * 1000L;
// std::cout<<getDelayTime(delayLevel)<<std::endl;
for (auto &msg : receivedMessages)
{
long bornTimestamp = msg.getBornTimestamp();
// std::cout<<consumeTime<<" "<<bornTimestamp<<std::endl;
if (std::abs((consumeTime - bornTimestamp) - getDelayTime(delayLevel)) > 5000)
{
map.insert(std::make_pair(msg.getMsgId(), consumeTime - bornTimestamp));
}
}
return map;
}
bool VerifyUtils::checkOrder(DataCollector<MQMsg> &dequeueMessages)
{
std::vector<MQMsg> receivedMessages = dequeueMessages.getAllData();
std::unordered_map<std::string, std::vector<MQMsg>> map;
for (const auto &receivedMessage : receivedMessages)
{
const std::string &shardingKey = std::to_string(std::stoi(receivedMessage.getBody()) % 2);
std::vector<MQMsg> messages;
if (map.find(shardingKey) != map.end())
{
map[shardingKey].push_back(receivedMessage);
}
else
{
messages.push_back(receivedMessage);
map[shardingKey] = messages;
}
}
return checkOrderMessage(map);
}
bool async_function(const std::string &topic, const std::string &subExpression, std::shared_ptr<rocketmq::DefaultMQPullConsumer> pullConsumer)
{
std::vector<rocketmq::MQMessageQueue> mqs;
try
{
pullConsumer->fetchSubscribeMessageQueues(topic, mqs);
for (auto &mq : mqs)
{
long long offset = pullConsumer->fetchConsumeOffset(mq, true);
if (offset < 0)
continue;
rocketmq::PullResult pullResult = pullConsumer->pull(mq, subExpression, offset, 32);
switch (pullResult.pullStatus)
{
case rocketmq::FOUND:
for (auto &msg : pullResult.msgFoundList)
{
multi_logger->info("Message: {}", msg.toString());
}
offset = pullResult.nextBeginOffset;
pullConsumer->updateConsumeOffset(mq, offset);
break;
case rocketmq::NO_MATCHED_MSG:
break;
case rocketmq::NO_NEW_MSG:
break;
case rocketmq::OFFSET_ILLEGAL:
break;
default:
break;
}
}
}
catch (const rocketmq::MQException &e)
{
multi_logger->error("fetchSubscribeMessageQueues exception: {}", e.what());
return false;
}
return true;
}
bool VerifyUtils::tryReceiveOnce(const std::string &topic, const std::string &subExpression, std::shared_ptr<rocketmq::DefaultMQPullConsumer> pullConsumer)
{
// async_function(topic, pullConsumer);
std::future<bool> future1 = std::async(std::launch::async, [topic, subExpression, pullConsumer]()
{ return async_function(topic, subExpression, pullConsumer); });
// std::future<bool> future2 = std::async(std::launch::async, [topic, pullConsumer](){ return async_function(topic, pullConsumer); });
// std::future<bool> future3 = std::async(std::launch::async, [topic, pullConsumer](){ return async_function(topic, pullConsumer); });
// std::future<bool> future4 = std::async(std::launch::async, [topic, pullConsumer](){ return async_function(topic, pullConsumer); });
// std::future<bool> future5 = std::async(std::launch::async, [topic, pullConsumer](){ return async_function(topic, pullConsumer); });
auto status1 = future1.wait_for(std::chrono::seconds(30));
// auto status2 = future2.wait_for(std::chrono::seconds(30));
// auto status3 = future3.wait_for(std::chrono::seconds(30));
// auto status4 = future4.wait_for(std::chrono::seconds(30));
// auto status5 = future5.wait_for(std::chrono::seconds(30));
if (status1 == std::future_status::ready && future1.get() == true)
{
return true;
}
else
{
return false;
}
}
std::vector<rocketmq::MQMessageExt> VerifyUtils::fetchMessages(std::shared_ptr<rocketmq::DefaultMQPullConsumer> pullConsumer, const std::string &topic)
{
std::vector<rocketmq::MQMessageQueue> mqs;
pullConsumer->fetchSubscribeMessageQueues(topic, mqs);
// rocekmq获取队列中所有未消费的消息,首先判断消息数量是不是为1,然后判断消息体是否为空
for (auto &mq : mqs)
{
long long offset = pullConsumer->fetchConsumeOffset(mq, true);
if (offset < 0)
continue;
rocketmq::PullResult pullResult = pullConsumer->pull(mq, "", offset, 32);
switch (pullResult.pullStatus)
{
case rocketmq::FOUND:
for (auto &msg : pullResult.msgFoundList)
{
msgs.push_back(msg);
// std::cout << "msg body: " << msg.getBody() << std::endl;
}
offset = pullResult.nextBeginOffset;
pullConsumer->updateConsumeOffset(mq, offset);
break;
case rocketmq::NO_MATCHED_MSG:
break;
case rocketmq::NO_NEW_MSG:
break;
case rocketmq::OFFSET_ILLEGAL:
break;
default:
break;
}
}
return msgs;
}
std::vector<std::string> VerifyUtils::waitForMessageConsume(DataCollector<std::string> &enqueueMessages, DataCollector<std::string> &dequeueMessages, long long timeoutMills, int consumedTimes)
{
multi_logger->info("Set timeout: {}ms", timeoutMills);
std::vector<std::string> sendMessages = enqueueMessages.getAllData();
auto currentTime = std::chrono::steady_clock::now();
while (!sendMessages.empty())
{
std::vector<std::string> receivedMessagesCopy = dequeueMessages.getAllData();
sendMessages.erase(std::remove_if(sendMessages.begin(), sendMessages.end(),
[&](const std::string &enqueueMessageId)
{
auto count = std::count_if(receivedMessagesCopy.begin(), receivedMessagesCopy.end(),
[&](const std::string &msg)
{
return msg == enqueueMessageId;
});
if (count >= consumedTimes)
{
if (count > consumedTimes)
{
multi_logger->error("More retry messages were consumed than expected (including one original message)"
"Except: {}, Actual: {}, MsgId: {}",
consumedTimes, count, enqueueMessageId);
assert(false);
}
return true;
}
return false;
}),
sendMessages.end());
if (sendMessages.empty())
{
break;
}
if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - currentTime).count() >= timeoutMills)
{
multi_logger->error("Timeout but not received all send messages");
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return sendMessages;
}
std::vector<std::string> VerifyUtils::waitForMessageConsume(DataCollector<std::string> &enqueueMessages, DataCollector<MQMsg> &dequeueMessages, long long timeoutMills, int consumedTimes)
{
multi_logger->info("Set timeout: {}ms", timeoutMills);
std::vector<std::string> sendMessages = enqueueMessages.getAllData();
auto currentTime = std::chrono::steady_clock::now();
while (!sendMessages.empty())
{
std::vector<MQMsg> receivedMessagesCopy = dequeueMessages.getAllData();
// std::cout << "receivedMessagesCopy size: " << receivedMessagesCopy.size() << std::endl;
sendMessages.erase(std::remove_if(sendMessages.begin(), sendMessages.end(),
[&](const std::string &enqueueMessageId)
{
auto count = std::count_if(receivedMessagesCopy.begin(), receivedMessagesCopy.end(),
[&](const MQMsg &msg)
{
return msg.getMsgId() == enqueueMessageId;
});
if (count >= consumedTimes)
{
if (count > consumedTimes)
{
multi_logger->error("More retry messages were consumed than expected (including one original message)"
"Except: {}, Actual: {}, MsgId: {}",
consumedTimes, count, enqueueMessageId);
assert(false);
}
return true;
}
return false;
}),
sendMessages.end());
if (sendMessages.empty())
{
break;
}
if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - currentTime).count() >= timeoutMills)
{
multi_logger->error("Timeout but not received all send messages");
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return sendMessages;
}
bool VerifyUtils::verifyNormalMessage(DataCollector<std::string> &enqueueMessages, DataCollector<std::string> &dequeueMessages)
{
std::vector<std::string> unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, TIMEOUT * 1000L, 1);
if (unConsumedMessages.size() > 0)
{
multi_logger->error("Not all messages were consumed, unConsumedMessages size: {}", unConsumedMessages.size());
return false;
}
return true;
}
bool VerifyUtils::verifyNormalMessage(DataCollector<std::string> &enqueueMessages, DataCollector<std::string> &dequeueMessages, std::unordered_set<std::string> &unconsumedMsgIds)
{
std::vector<std::string> unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, 30 * 1000L, 1);
for (auto &unConsumedMessage : unConsumedMessages)
{
auto it = unconsumedMsgIds.find(unConsumedMessage);
if (it == unconsumedMsgIds.end())
{
multi_logger->error("Message {} should be consumed", unConsumedMessage);
return false;
}
else
{
unconsumedMsgIds.erase(it);
}
}
if (unconsumedMsgIds.size() > 0)
{
multi_logger->error("UnConsumedMessages size: {}", unConsumedMessages.size());
return false;
}
return true;
}
bool VerifyUtils::verifyNormalMessage(DataCollector<std::string> &enqueueMessages, DataCollector<MQMsg> &dequeueMessages)
{
std::vector<std::string> unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, TIMEOUT * 1000L, 1);
if (unConsumedMessages.size() > 0)
{
multi_logger->error("Not all messages were consumed, unConsumedMessages size: {}", unConsumedMessages.size());
return false;
}
return true;
}
bool VerifyUtils::verifyNormalMessageWithUserProperties(DataCollector<std::string> &enqueueMessages, DataCollector<MQMsg> &dequeueMessages, std::map<std::string, std::string> &props, int expectedUnrecvMsgNum)
{
std::vector<std::string> unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, TIMEOUT * 1000L, 1);
std::vector<MQMsg> recvMessages = dequeueMessages.getAllData();
for (auto &recvMessage : recvMessages)
{
auto recvProps = recvMessage.getProperties();
for (auto &prop : props)
{
auto it = recvProps.find(prop.first);
if (it != recvProps.end() && it->second == prop.second)
{
multi_logger->error("sql attribute filtering is not in effect, consuming messages to other attributes");
return false;
}
}
}
if (unConsumedMessages.size() != expectedUnrecvMsgNum)
{
multi_logger->error("Failed to consume all the sent data by sql filter");
return false;
}
return true;
}
bool VerifyUtils::verifyDelayMessage(DataCollector<std::string> &enqueueMessages, DataCollector<MQMsg> &dequeueMessages, int delayLevel)
{
std::vector<std::string> unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, TIMEOUT * 1000L + getDelayTime(delayLevel), 1);
if (unConsumedMessages.size() > 0)
{
multi_logger->error("Not all messages were consumed, unConsumedMessages size: {}", unConsumedMessages.size());
return false;
}
std::unordered_map<std::string, long> delayUnExcept = checkDelay(dequeueMessages, delayLevel);
std::ostringstream oss;
oss << "The following messages do not meet the delay requirements \n";
for (const auto &pair : delayUnExcept)
{
std::string key = pair.first;
oss << key << " , interval:" << delayUnExcept[key] << "\n";
}
if (delayUnExcept.size() > 0)
{
multi_logger->error(oss.str());
return false;
}
return true;
}
bool VerifyUtils::verifyOrderMessage(DataCollector<std::string> &enqueueMessages, DataCollector<MQMsg> &dequeueMessages)
{
std::vector<std::string> unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, TIMEOUT * 1000L, 1);
if (unConsumedMessages.size() > 0)
{
multi_logger->error("Not all messages were consumed, unConsumedMessages size: {}", unConsumedMessages.size());
return false;
}
bool result = checkOrder(dequeueMessages);
if (!result)
{
multi_logger->error("Message out of order");
}
return result;
}
bool VerifyUtils::checkOrderMessage(std::unordered_map<std::string, std::vector<MQMsg>> &receivedMessage)
{
for (auto &pair : receivedMessage)
{
std::ostringstream oss;
int preNode = -1;
std::string key = pair.first;
std::vector<MQMsg> msgs = pair.second;
std::string tag = msgs[0].getTags();
for (auto &msg : msgs)
{
if (msg.getTags() != tag)
{
preNode = -1;
}
int curNode = std::stoi(msg.getBody());
oss << curNode << ",";
if (preNode > curNode)
{
multi_logger->error(oss.str());
return false;
}
preNode = curNode;
}
}
return true;
}
void modifyString2Empty(const std::string &msgId, std::vector<std::string> &msgs, std::mutex &mtx, std::atomic<int> &recvCount)
{
std::lock_guard<std::mutex> lock(mtx);
for (auto &msg : msgs)
{
if (msgId == msg)
{
std::cout << "msg id: " << msg << std::endl;
msg = "";
std::cout << "msg id change: " << msg << std::endl;
recvCount--;
break;
}
}
}
bool VerifyUtils::waitReceiveThenAck(std::shared_ptr<RMQNormalProducer> producer, std::shared_ptr<rocketmq::DefaultMQPullConsumer> pullConsumer, std::string &topic, std::string &tag, int maxMessageNum)
{
long endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count() + TIMEOUT * 1000L;
std::vector<rocketmq::MQMessageQueue> mqs;
pullConsumer->fetchSubscribeMessageQueues(topic, mqs);
std::vector<std::string> sendMsgs = producer->getEnqueueMessages()->getAllData();
std::atomic<int> recvCount(sendMsgs.size());
while (endTime > std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
{
std::vector<std::function<void()>> runnables;
for (auto &mq : mqs)
{
runnables.push_back([&]()
{
long long offset = pullConsumer->fetchConsumeOffset(mq, false);
if(offset<0) return;
rocketmq::PullResult pullResult = pullConsumer->pull(mq, tag, offset, maxMessageNum);
switch (pullResult.pullStatus) {
case rocketmq::FOUND:
for (auto& msg : pullResult.msgFoundList) {
for (auto& sendMsg : sendMsgs) {
if(msg.getMsgId() == sendMsg){
sendMsg = "";
recvCount--;
offset += 1;
pullConsumer->updateConsumeOffset(mq, offset);
}
}
}
break;
case rocketmq::NO_MATCHED_MSG:
break;
case rocketmq::NO_NEW_MSG:
break;
case rocketmq::OFFSET_ILLEGAL:
break;
default:
break;
} });
}
if (recvCount == 0)
{
break;
}
std::vector<std::future<void>> futures;
for (const auto &runnable : runnables)
{
futures.push_back(std::async(std::launch::async, runnable));
}
// 等待所有函数对象完成
for (auto &future : futures)
{
future.get();
}
}
if (recvCount != 0)
{
multi_logger->error("Not all messages were consumed, unConsumedMessages size: {}", recvCount);
return false;
}
else
{
return true;
}
}
bool VerifyUtils::waitFIFOParamReceiveThenNAck(std::shared_ptr<RMQNormalProducer> producer, std::shared_ptr<rocketmq::DefaultMQPullConsumer> pullConsumer, std::string &topic, std::string &tag, int maxMessageNum)
{
long endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count() + 30 * 1000L;
std::vector<rocketmq::MQMessageQueue> mqs;
pullConsumer->fetchSubscribeMessageQueues(topic, mqs);
SimpleConcurrentHashMap<std::string, rocketmq::MQMessageExt> receivedMap;
while (endTime > std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
{
std::vector<std::function<void()>> runnables;
for (auto &mq : mqs)
{
runnables.push_back([&]()
{
long long offset = pullConsumer->fetchConsumeOffset(mq, false);
if(offset<0) return;
rocketmq::PullResult pullResult = pullConsumer->pull(mq, tag, offset, maxMessageNum);
switch (pullResult.pullStatus) {
case rocketmq::FOUND:
for (auto& msg : pullResult.msgFoundList) {
if(receivedMap.contains(msg.getMsgId())){
receivedMap.insert(msg.getMsgId(),msg);
}
}
break;
case rocketmq::NO_MATCHED_MSG:
break;
case rocketmq::NO_NEW_MSG:
break;
case rocketmq::OFFSET_ILLEGAL:
break;
default:
break;
} });
}
std::vector<std::future<void>> futures;
for (const auto &runnable : runnables)
{
futures.push_back(std::async(std::launch::async, runnable));
}
// 等待所有函数对象完成并获取结果
for (auto &future : futures)
{
future.get();
}
}
for (auto &msg : receivedMap.getAllValues())
{
int id = std::stoi(msg.getBody());
if (id >= 8)
{
multi_logger->error("Consumption out of order, expected :Body=0 Actual :Body={}", id);
return false;
}
}
return true;
}
bool VerifyUtils::waitFIFOParamReceiveThenAckExceptedLast(std::shared_ptr<RMQNormalProducer> producer, std::shared_ptr<rocketmq::DefaultMQPullConsumer> pullConsumer, std::string &topic, std::string &tag, int maxMessageNum)
{
long endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count() + 30 * 1000L;
std::vector<rocketmq::MQMessageQueue> mqs;
pullConsumer->fetchSubscribeMessageQueues(topic, mqs);
std::vector<rocketmq::MQMessageExt> receivedMessage;
SimpleConcurrentHashMap<std::string, std::atomic<int>> map;
while (endTime > std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
{
std::vector<std::function<bool()>> runnables;
for (auto &mq : mqs)
{
runnables.push_back([&]()
{
int count = 0; //same message queue,so dont't need atomic
long long offset = pullConsumer->fetchConsumeOffset(mq, false);
if(offset<0) return true;
rocketmq::PullResult pullResult = pullConsumer->pull(mq, tag, offset, maxMessageNum);
switch (pullResult.pullStatus) {
case rocketmq::FOUND:
for(int j=0;j<pullResult.msgFoundList.size();j++){
std::string msgId = pullResult.msgFoundList[j].getMsgId();
int id = std::stoi(pullResult.msgFoundList[j].getBody());
if(id != 19 && count!=id) {
multi_logger->error("Consumption out of order, expected :order={} Actual :order={}",count,id);
return false;
}
count++;
if(id != 19){
offset+=1;
pullConsumer->updateConsumeOffset(mq, offset);
if(map.contains(msgId)){
map[msgId]++;
}else{
std::atomic<int> val(1);
map.insert(msgId,val.load());
}
}
}
break;
case rocketmq::NO_MATCHED_MSG:
break;
case rocketmq::NO_NEW_MSG:
break;
case rocketmq::OFFSET_ILLEGAL:
break;
default:
break;
}
return true; });
}
if (map.size() >= 20)
{
return false;
}
for (auto &value : map.getAllValues())
{
if (value > 1)
{
return false;
}
}
std::vector<std::future<bool>> futures;
for (const auto &runnable : runnables)
{
futures.push_back(std::async(std::launch::async, runnable));
}
// 等待所有函数对象完成并获取结果
for (auto &future : futures)
{
bool result = future.get();
if (!result)
return false;
}
}
return true;
}
bool VerifyUtils::waitFIFOReceiveThenAck(std::shared_ptr<RMQNormalProducer> producer, std::shared_ptr<rocketmq::DefaultMQPullConsumer> pullConsumer, std::string &topic, std::string &tag, int maxMessageNum)
{
long endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count() + TIMEOUT * 1000L;
std::vector<std::string> sendCollection = producer->getEnqueueMessages()->getAllData();
try
{
while (endTime > std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
{
std::vector<rocketmq::MQMessageQueue> mqs;
pullConsumer->fetchSubscribeMessageQueues(topic, mqs);
for (auto &mq : mqs)
{
std::unordered_map<std::string, std::vector<MQMsg>> receivedMessage;
long long offset = pullConsumer->fetchConsumeOffset(mq, false);
if (offset < 0)
continue;
rocketmq::PullResult pullResult = pullConsumer->pull(mq, tag, offset, maxMessageNum);
switch (pullResult.pullStatus)
{
case rocketmq::FOUND:
for (int j = 0; j < pullResult.msgFoundList.size(); j++)
{
int id = std::stoi(pullResult.msgFoundList[j].getBody()) / 20;
offset += 1;
pullConsumer->updateConsumeOffset(mq, offset);
sendCollection.erase(std::remove_if(sendCollection.begin(), sendCollection.end(),
[&](const std::string &enqueueMessageId)
{
if (pullResult.msgFoundList[j].getMsgId() == enqueueMessageId)
{
return true;
}
return false;
}),
sendCollection.end());
std::string msgId(std::to_string(id));
if (receivedMessage.find(msgId) != receivedMessage.end())
{
receivedMessage[msgId].push_back(MQMsg(pullResult.msgFoundList[j]));
}
else
{
std::vector<MQMsg> msgs;
msgs.push_back(MQMsg(pullResult.msgFoundList[j]));
receivedMessage[msgId] = msgs;
}
}
break;
case rocketmq::NO_MATCHED_MSG:
break;
case rocketmq::NO_NEW_MSG:
break;
case rocketmq::OFFSET_ILLEGAL:
break;
default:
break;
}
if (!checkOrderMessage(receivedMessage))
{
return false;
}
}
if (sendCollection.size() == 0)
break;
}
if (sendCollection.size() != 0)
{
multi_logger->error("Not all messages were consumed, unConsumedMessages size: {}", sendCollection.size());
return false;
}
}
catch (const std::exception &e)
{
multi_logger->error("{}", e.what());
}
return true;
}
bool VerifyUtils::waitAckExceptionReReceiveAck(std::shared_ptr<RMQNormalProducer> producer, std::shared_ptr<rocketmq::DefaultMQPullConsumer> pullConsumer, std::string &topic, std::string &tag, int maxMessageNum)
{
long endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count() + 60 * 1000L;
try
{
while (endTime > std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
{
std::vector<rocketmq::MQMessageQueue> mqs;
pullConsumer->fetchSubscribeMessageQueues(topic, mqs);
for (auto &mq : mqs)
{
long long offset = pullConsumer->fetchConsumeOffset(mq, false);
if (offset < 0)
continue;
rocketmq::PullResult pullResult = pullConsumer->pull(mq, tag, offset, maxMessageNum);
switch (pullResult.pullStatus)
{
case rocketmq::FOUND:
for (int j = 0; j < pullResult.msgFoundList.size(); j++)
{
multi_logger->info("Message: {}", pullResult.msgFoundList[j].toString());
std::this_thread::sleep_for(std::chrono::seconds(11));
offset += 1;
pullConsumer->updateConsumeOffset(mq, offset);
}
break;
case rocketmq::NO_MATCHED_MSG:
break;
case rocketmq::NO_NEW_MSG:
break;
case rocketmq::OFFSET_ILLEGAL:
break;
default:
break;
}
}
}
}
catch (const std::exception &e)
{
multi_logger->error("{}", e.what());
return true;
}
return true;
}
bool VerifyUtils::waitReceiveMaxsizeSync(std::shared_ptr<RMQNormalProducer> producer, std::shared_ptr<rocketmq::DefaultMQPullConsumer> pullConsumer, std::string &topic, std::string &tag, int maxMessageNum)
{
long endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count() + TIMEOUT * 1000L;
SimpleConcurrentHashMap<std::string, rocketmq::MQMessageExt> map;
try
{
while (endTime > std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
{
std::vector<rocketmq::MQMessageQueue> mqs;
pullConsumer->fetchSubscribeMessageQueues(topic, mqs);
for (auto &mq : mqs)
{
long long offset = pullConsumer->fetchConsumeOffset(mq, false);
if (offset < 0)
continue;
rocketmq::PullResult pullResult = pullConsumer->pull(mq, tag, offset, maxMessageNum);
switch (pullResult.pullStatus)
{
case rocketmq::FOUND:
for (int j = 0; j < pullResult.msgFoundList.size(); j++)
{
multi_logger->info("Message: {}", pullResult.msgFoundList[j].toString());
std::string msgId = pullResult.msgFoundList[j].getMsgId();
if (map.contains(msgId))
{
multi_logger->error("Duplicate message");
return false;
}
else
{
offset += 1;
pullConsumer->updateConsumeOffset(mq, offset);
map.insert(msgId, pullResult.msgFoundList[j]);
}
}
break;
case rocketmq::NO_MATCHED_MSG:
break;
case rocketmq::NO_NEW_MSG:
break;
case rocketmq::OFFSET_ILLEGAL:
break;
default:
break;
}
}
multi_logger->info("receive {} messages", map.size());
if (map.size() == 300)
break;
}
DataCollector<std::string> &dequeueMessages = DataCollectorManager<std::string>::getInstance().fetchListDataCollector(RandomUtils::getStringByUUID());
for (auto &value : map.getAllValues())
{
dequeueMessages.addData(value.getMsgId());
}
if (!VerifyUtils::verifyNormalMessage(*(producer->getEnqueueMessages()), dequeueMessages))
{
return false;
}
}
catch (const std::exception &e)
{
multi_logger->error("{}", e.what());
return false;
}
return true;
}
bool VerifyUtils::waitReceiveMultiNack(std::shared_ptr<RMQNormalProducer> producer, std::shared_ptr<rocketmq::DefaultMQPullConsumer> pullConsumer, std::string &topic, std::string &tag, int maxMessageNum)
{
long endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count() + 30 * 1000L;
std::vector<std::function<bool()>> runnables;
SimpleConcurrentHashMap<std::string, rocketmq::MQMessageExt> recvMsgs;
bool flag{true};
std::unordered_set<std::string> unconsumedMsgIds;
for (int i = 0; i < 4; i++)
{
while (endTime > std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
{
try
{
std::vector<rocketmq::MQMessageQueue> mqs;
pullConsumer->fetchSubscribeMessageQueues(topic, mqs);
for (auto &mq : mqs)
{
long long offset = pullConsumer->fetchConsumeOffset(mq, false);
if (offset < 0)
continue;
rocketmq::PullResult pullResult = pullConsumer->pull(mq, tag, offset, maxMessageNum);
switch (pullResult.pullStatus)
{
case rocketmq::FOUND:
for (int j = 0; j < pullResult.msgFoundList.size(); j++)
{
int id = std::stoi(pullResult.msgFoundList[j].getBody());
if (id == 19 && flag)
{
flag = false;
unconsumedMsgIds.insert(pullResult.msgFoundList[j].getMsgId());
}
else
{
if (id == 19)
{
unconsumedMsgIds.insert(pullResult.msgFoundList[j].getMsgId());
}
else
{
multi_logger->info("Message: {}", pullResult.msgFoundList[j].toString());
offset += 1;
pullConsumer->updateConsumeOffset(mq, offset);
if (recvMsgs.contains(pullResult.msgFoundList[j].getMsgId()))
{
multi_logger->error("Duplicate message");
return false;
}
else
{
recvMsgs.insert(pullResult.msgFoundList[j].getMsgId(), pullResult.msgFoundList[j]);
}
}
}
}
break;
case rocketmq::NO_MATCHED_MSG:
break;
case rocketmq::NO_NEW_MSG:
break;
case rocketmq::OFFSET_ILLEGAL:
break;
default:
break;
}
}
}
catch (const std::exception &e)
{
multi_logger->error("{}", e.what());
return false;
}
if (recvMsgs.size() == 20)
return false;
}
return true;
}
DataCollector<std::string> &dequeueMessages = DataCollectorManager<std::string>::getInstance().fetchListDataCollector(RandomUtils::getStringByUUID());
for (auto &value : recvMsgs.getAllValues())
{
dequeueMessages.addData(value.getMsgId());
}
if (!verifyNormalMessage(*(producer->getEnqueueMessages()), dequeueMessages, unconsumedMsgIds))
{
return false;
}
return true;
}