blob: 6efa8328170e690d11f84681efc772088818f630 [file] [log] [blame]
#include <gtest/gtest.h>
#include <iostream>
#include "BaseTest.h"
namespace ons {
class ApiTest : public BaseTest {
};
TEST_F(ApiTest, test_create_producer) {
int instance_id = create_producer(thread_, &property);
EXPECT_TRUE(instance_id >= 0) << "Instance Id should be non-negative";
destroy_instance(thread_, instance_id);
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
TEST_F(ApiTest, test_create_order_producer) {
int instance_id = create_order_producer(thread_, &property);
EXPECT_TRUE(instance_id >= 0) << "Instance Id should be non-negative";
destroy_instance(thread_, instance_id);
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
TEST_F(ApiTest, test_create_transaction_producer) {
void *transaction_check = (void *) transaction_check_func;
LocalTransactionCheckerImpl pTransactionCheckListener;
void *checker = reinterpret_cast<void *>(&pTransactionCheckListener);
int instance_id = create_transaction_producer(thread_, &property, checker, transaction_check);
EXPECT_TRUE(instance_id >= 0) << "Instance Id should be non-negative";
destroy_instance(thread_, instance_id);
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
TEST_F(ApiTest, test_create_consumer) {
int consumer_id = create_consumer(thread_, &property);
EXPECT_TRUE(consumer_id >= 0) << "Instance Id should be non-negative";
destroy_instance(thread_, consumer_id);
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
TEST_F(ApiTest, test_create_order_consumer) {
int consumer_id = create_order_consumer(thread_, &property);
EXPECT_TRUE(consumer_id >= 0) << "Instance Id should be non-negative";
destroy_instance(thread_, consumer_id);
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
TEST_F(ApiTest, test_send_message) {
int instance_id = create_producer(thread_, &property);
message m;
message_creator creator(m);
send_result sr;
send_result_wrapper wrapper(sr);
for (int i = 0; i < total; i++) {
send_message(thread_, instance_id, &m, &sr);
ASSERT_EQ(sr.error_no, 0);
std::cout << "Send OK, MsgId: " << sr.message_id << std::endl;
}
destroy_instance(thread_, instance_id);
std::cout << "Destroy instance OK" << std::endl;
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
TEST_F(ApiTest, test_send_message_oneway) {
int instance_id = create_producer(thread_, &property);
message m;
message_creator creator(m);
send_result sendResult;
send_result_wrapper wrapper(sendResult);
bzero(&sendResult, sizeof(send_result));
for (int i = 0; i < total; i++) {
send_message_oneway(thread_, instance_id, &m, &sendResult);
}
destroy_instance(thread_, instance_id);
std::cout << "Destroy instance OK" << std::endl;
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
ASSERT_EQ(0, sendResult.error_no);
}
TEST_F(ApiTest, test_send_order_message) {
updatePropertyKeyValue("group_id", "GID_opensource_unit_test_order");
int instance_id = create_order_producer(thread_, &property);
message m;
message_creator creator(m, true);
send_result sr;
send_result_wrapper wrapper(sr);
char *sharding_key;
string_wrapper stringWrapper(sharding_key, "1");
for (int i = 0; i < total; i++) {
send_order_message(thread_, instance_id, &m, &sr, sharding_key);
ASSERT_EQ(sr.error_no, 0);
std::cout << "Send OK, MsgId: " << sr.message_id << std::endl;
}
destroy_instance(thread_, instance_id);
std::cout << "Destroy instance OK" << std::endl;
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
TEST_F(ApiTest, test_send_message_async) {
MyCallback pSendCallback;
int instance_id = create_producer(thread_, &property);
message m;
message_creator creator(m);
send_result sendResult;
send_result_wrapper wrapper(sendResult);
callback_func c_f;
c_f.on_success = on_success_func;
c_f.on_exception = on_exception_func;
c_f.send_callback_ons = reinterpret_cast<char *>(&pSendCallback);
for (int i = 0; i < MyCallback::message_num; i++) {
send_message_async(thread_, instance_id, &m, &sendResult, &c_f);
}
{
std::unique_lock<std::mutex> lk(m1);
cv.wait(lk);
}
destroy_instance(thread_, instance_id);
std::cout << "Destroy instance OK" << std::endl;
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
TEST_F(ApiTest, send_message_transaction) {
void *transaction_check = (void *) transaction_check_func;
LocalTransactionCheckerImpl pTransactionCheckListener;
void *checker = reinterpret_cast<void *>(&pTransactionCheckListener);
int instance_id = create_transaction_producer(thread_, &property, checker, transaction_check);
LocalTransactionExecuterImpl pTransactionExecutor;
message m;
message_creator creator(m);
send_result sr;
send_result_wrapper wrapper(sr);
void *executor = reinterpret_cast<void *>(&pTransactionExecutor);
void *transaction_execute = (void *) transaction_execute_func;
for (int i = 0; i < total; i++) {
send_message_transaction(thread_, instance_id, &m, &sr, executor, transaction_execute);
ASSERT_EQ(sr.error_no, 0);
std::cout << "Send OK, MsgId: " << sr.message_id << std::endl;
}
destroy_instance(thread_, instance_id);
std::cout << "Destroy instance OK" << std::endl;
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
TEST_F(ApiTest, subsribe) {
int instance_id = create_consumer(thread_, &property);
ExampleMessageListener messageListener;
ons::MessageListener *listener = &messageListener;
subscription sub;
memset(&sub, 0, sizeof(subscription));
subscribe_creator creator(sub, "t_opensource_unit_test", "*");
sub.opaque = listener;
sub.on_message = consumer_on_message;
::subscribe(thread_, instance_id, &sub);
start_instance(thread_, instance_id);
std::this_thread::sleep_for(std::chrono::milliseconds(20 * 1000));
destroy_instance(thread_, instance_id);
std::cout << "Destroy instance OK" << std::endl;
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
TEST_F(ApiTest, subsribe_order) {
updatePropertyKeyValue("group_id", "GID_opensource_unit_test_order");
int instance_id = create_order_consumer(thread_, &property);
ExampleMessageOrderListener messageOrderListener;
ons::MessageOrderListener *listener = &messageOrderListener;
subscription sub;
memset(&sub, 0, sizeof(subscription));
subscribe_creator creator(sub, "t_opensource_unit_test_order", "tagA");
sub.opaque = listener;
sub.on_message = order_consumer_on_message;
subscribe_order_listener(thread_, instance_id, &sub);
start_instance(thread_, instance_id);
std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
destroy_instance(thread_, instance_id);
std::cout << "Destroy instance OK" << std::endl;
ASSERT_EQ(graal_detach_thread(thread_), 0) << "Detach graal thread failed";
}
} // namespace ons