blob: a87fe2951c1406e5e91ea585128e2cd4e458492a [file] [log] [blame]
#ifndef MQ_BASE_TEST_H
#define MQ_BASE_TEST_H
#include <atomic>
#include <mutex>
#include <condition_variable>
#include "rocketmq.h"
#include "rocketmq-ons-cpp-full.h"
#include "ONSCallback.h"
#include "Message.h"
#include "UtilAll.h"
#include "Common.h"
namespace ons {
class send_result_wrapper {
public:
send_result_wrapper(send_result &sendResult) : sendResult_(sendResult) {
memset(&sendResult, 0, sizeof(sendResult));
sendResult.message_id = new char[ONS_SEND_RESULT_MSG_ID_LEN_MAX + 1];
memset(sendResult.message_id, 0, ONS_SEND_RESULT_MSG_ID_LEN_MAX + 1);
sendResult.error_msg = new char[ONS_SEND_RESULT_ERR_MSG_LEN_MAX + 1];
memset(sendResult.error_msg, 0, ONS_SEND_RESULT_ERR_MSG_LEN_MAX + 1);
}
~send_result_wrapper() {
delete[](sendResult_.message_id);
delete[](sendResult_.error_msg);
}
private:
send_result &sendResult_;
};
class message_creator {
public:
message_creator(message &m, bool is_order = false) : m_(m) {
memset(&m, 0, sizeof(m));
if (!is_order) {
const char *topic = "t_opensource_unit_test";
m.topic = new char[strlen(topic) + 1];
strcpy(m.topic, topic);
} else {
const char *topic_ = "t_opensource_unit_test_order";
m.topic = new char[strlen(topic_) + 1];
strcpy(m.topic, topic_);
}
const char *tag = "tagA";
m.tags = new char[strlen(tag) + 1];
strcpy(m.tags, tag);
const char *body = "Sample message body content";
m.body = new char[strlen(body) + 1];
strcpy(m.body, body);
m.body_size = strlen(m.body);
const char *key = "abc";
m.key = new char[strlen(key) + 1];
strcpy(m.key, key);
const char *props = "{}";
m.user_prop = new char[strlen(props) + 1];
strcpy(m.user_prop, props);
m.system_prop = new char[strlen(props) + 1];
strcpy(m.system_prop, props);
}
~message_creator() {
delete[](m_.topic);
delete[](m_.tags);
delete[](m_.body);
delete[](m_.key);
delete[](m_.user_prop);
delete[](m_.system_prop);
}
private:
message &m_;
};
class subscribe_creator {
public:
subscribe_creator(subscription &sub,const char *topic, const char *sub_expression):sub_(sub) {
sub.topic = new char[strlen(topic) + 1];
strcpy(sub.topic, topic);
sub.sub_expression = new char[strlen(sub_expression) + 1];
strcpy(sub.sub_expression, sub_expression);
}
~subscribe_creator() {
delete[](sub_.topic);
delete[](sub_.sub_expression);
}
private:
subscription &sub_;
};
class string_wrapper {
public:
string_wrapper(char* &str,const char* value) : str_(str) {
str = new char[strlen(value) + 1];
strcpy(str, value);
}
~string_wrapper() {
delete str_;
}
private:
char* &str_;
};
class BaseTest : public ::testing::Test {
protected:
BaseTest() {}
void SetUp() override {
int create_isolate_status = graal_create_isolate(NULL, &isolate_, &thread_);
ASSERT_EQ(create_isolate_status, 0) << "error on isolate creation or attach";
memset(&property, 0, sizeof(factory_property));
const char *group_id = "GID_opensource_unit_test";
const char *access_key = "AK";
const char *access_secret = "SK";
const char *name_srv_addr = "47.100.33.127:9876";
group_id_ = new char[strlen(group_id) + 1];
strcpy(group_id_, group_id);
property.group_id = group_id_;
access_key_ = new char[strlen(access_key) + 1];
strcpy(access_key_, access_key);
property.access_key = access_key_;
access_secret_ = new char[strlen(access_secret) + 1];
strcpy(access_secret_, access_secret);
property.access_secret = access_secret_;
name_srv_addr_ = new char[strlen(name_srv_addr) + 1];
strcpy(name_srv_addr_, name_srv_addr);
property.name_srv_addr = name_srv_addr_;
total = 32;
}
void TearDown() override {
delete group_id_;
group_id_ = NULL;
delete access_key_;
access_key_ = NULL;
delete access_secret_;
access_secret_ = NULL;
delete name_srv_addr_;
name_srv_addr_ = NULL;
}
void updatePropertyKeyValue(const char *key, const char *value) {
if (0 == strcasecmp(key, "group_id")) {
if (group_id_ != NULL) {
delete group_id_;
group_id_ = NULL;
}
group_id_ = new char[strlen(value) + 1];
strcpy(group_id_, value);
property.group_id = group_id_;
}
if (0 == strcasecmp(key, "access_key")) {
if (access_key_ != NULL) {
delete access_key_;
access_key_ = NULL;
}
access_key_ = new char[strlen(value) + 1];
strcpy(access_key_, value);
property.access_key = access_key_;
}
if (0 == strcasecmp(key, "access_secret")) {
if (access_secret_ != NULL) {
delete access_secret_;
access_secret_ = NULL;
}
access_secret_ = new char[strlen(value) + 1];
strcpy(access_secret_, value);
property.access_secret = access_secret_;
}
if (0 == strcasecmp(key, "name_srv_addr")) {
if (name_srv_addr_ != NULL) {
delete name_srv_addr_;
name_srv_addr_ = NULL;
}
name_srv_addr_ = new char[strlen(value) + 1];
strcpy(name_srv_addr_, value);
property.name_srv_addr = name_srv_addr_;
}
}
graal_isolate_t *isolate_;
graal_isolatethread_t *thread_;
char *group_id_;
char *access_key_;
char *access_secret_;
char *name_srv_addr_;
factory_property property;
int total;
};
std::mutex m1;
std::mutex m2;
std::condition_variable cv;
class MyCallback : public ons::SendCallbackONS {
public:
void onSuccess(ons::SendResultONS &sendResult) override {
std::lock_guard<std::mutex> lg(m2);
success_num++;
std::cout << "send success, message_id: " << sendResult.getMessageId() << ", total: " << success_num
<< std::endl;
if (success_num + failed_num == message_num) {
cv.notify_all();
}
}
void onException(ons::ONSClientException &e) override {
std::lock_guard<std::mutex> lg(m2);
failed_num++;
std::cout << "send failure, total: " << failed_num << std::endl;
std::cout << e.what() << std::endl;
if (success_num + failed_num == message_num) {
cv.notify_all();
}
}
static int success_num;
static int failed_num;
static int message_num;
};
int MyCallback::success_num = 0;
int MyCallback::failed_num = 0;
int MyCallback::message_num = 32;
class LocalTransactionCheckerImpl : public ons::LocalTransactionChecker {
virtual ons::TransactionStatus check(ons::Message &msg) {
cout << "checker::commit transaction" << endl;
return ons::CommitTransaction;
}
};
class LocalTransactionExecuterImpl : public ons::LocalTransactionExecuter {
virtual ons::TransactionStatus execute(ons::Message &msg) {
cout << "executer::commit transaction of msgid: " << msg.getMsgID() << endl;
return ons::CommitTransaction;
}
};
std::mutex console_mtx;
class ExampleMessageListener : public ons::MessageListener {
public:
Action consume(ons::Message &message, ons::ConsumeContext &context) {
std::lock_guard<std::mutex> lk(console_mtx);
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
<< message.getMsgID() << std::endl;
return CommitMessage;
}
};
class ExampleMessageOrderListener : public ons::MessageOrderListener {
public:
OrderAction consume(ons::Message &message, ons::ConsumeOrderContext &context) {
std::lock_guard<std::mutex> lk(console_mtx);
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
<< message.getMsgID() << std::endl;
return Success;
}
};
#ifdef __cplusplus
extern "C" {
#endif
void on_success_func(void *thread, char *message_id, char *send_callback_ons) {
auto sendCallbackONS = reinterpret_cast<ons::SendCallbackONS *>(send_callback_ons);
ons::SendResultONS sendResultOns;
sendResultOns.setMessageId(message_id);
sendCallbackONS->onSuccess(sendResultOns);
}
void on_exception_func(void *thread, char *m_msg, int m_error, char *send_callback_ons) {
auto sendCallbackONS = reinterpret_cast<ons::SendCallbackONS *>(send_callback_ons);
ons::ONSClientException onsClientException(m_msg, m_error);
sendCallbackONS->onException(onsClientException);
}
int transaction_check_func(void *thread, void *opaque, char *topic, char *user_props, char *sys_props,
char *body, int body_len) {
ons::LocalTransactionChecker *checker = reinterpret_cast<ons::LocalTransactionChecker *>(opaque);
ons::Message message_;
message_.setTopic(topic);
ons::UtilAll::fill_message_props(message_, std::string(user_props), false);
ons::UtilAll::fill_message_props(message_, std::string(sys_props), true);
message_.setBody(reinterpret_cast<unsigned char *>(body), body_len);
return checker->check(message_);
}
int transaction_execute_func(void *thread, void *opaque, char *topic, char *user_props, char *sys_props,
char *body, int body_len) {
ons::LocalTransactionExecuter *executor = reinterpret_cast<ons::LocalTransactionExecuter *>(opaque);
ons::Message message_;
message_.setTopic(topic);
ons::UtilAll::fill_message_props(message_, std::string(user_props), false);
ons::UtilAll::fill_message_props(message_, std::string(sys_props), true);
message_.setBody(reinterpret_cast<unsigned char *>(body), body_len);
return executor->execute(message_);
}
int consumer_on_message(void *thread, void *opaque, char *topic, char *user_props, char *sys_props, char *body,
int body_len) {
ons::MessageListener *listener = reinterpret_cast<ons::MessageListener *>(opaque);
ons::Message message_;
message_.setTopic(topic);
ons::UtilAll::fill_message_props(message_, std::string(user_props), false);
ons::UtilAll::fill_message_props(message_, std::string(sys_props), true);
message_.setBody(reinterpret_cast<unsigned char *>(body), body_len);
ons::ConsumeContext ctx_;
Action action = listener->consume(message_, ctx_);
switch (action) {
case CommitMessage:
return 0;
case ReconsumeLater:
return 1;
default:
return 1;
}
}
int
order_consumer_on_message(void *thread, void *opaque, char *topic, char *user_props, char *sys_props, char *body,
int body_len) {
ons::MessageOrderListener *listener = reinterpret_cast<ons::MessageOrderListener *>(opaque);
ons::Message message_;
message_.setTopic(topic);
ons::UtilAll::fill_message_props(message_, std::string(user_props), false);
ons::UtilAll::fill_message_props(message_, std::string(sys_props), true);
message_.setBody(reinterpret_cast<unsigned char *>(body), body_len);
ons::ConsumeOrderContext ctx_;
OrderAction action = listener->consume(message_, ctx_);
switch (action) {
case Success:
return 0;
case Suspend:
return 1;
default:
return 1;
}
}
#ifdef __cplusplus
}
#endif
} // namespace ons
#endif // MQ_BASE_TEST_H