blob: b1da34e84a11fac4cc8f86f6a97e1f5afa18e999 [file] [log] [blame]
#include <unistd.h>
#include <sys/wait.h>
#include "postgres.h"
#include "cmockery.h"
#include "ic_modules.h"
#include "postmaster/postmaster.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "storage/latch.h"
#include "ic_test_env.h"
const char *progname = NULL;
static MemoryContext testMemoryContext = NULL;
pid_t server_ic_proxy_pid = -1;
pid_t client_ic_proxy_pid = -1;
static void
verify_chunk_tuple_list_serialized_tuple(TupleChunkListItem tc_item)
{
HeapTupleTableSlot *heap_tuple_slot = NULL;
char tc_item_verify_buff[COLUMN_TEXT_LEN];
heap_tuple_slot = get_tuple_slot_from_chunk_list_serialized_tuple(tc_item);
assert_true(heap_tuple_slot);
assert_int_equal(VARSIZE_ANY_EXHDR(heap_tuple_slot->base.tts_values[0]), COLUMN_TEXT_LEN);
generate_seq_buffer(tc_item_verify_buff, COLUMN_TEXT_LEN);
/* 1b string header */
assert_string_equal(tc_item_verify_buff, DatumGetPointer(heap_tuple_slot->base.tts_values[0]) + 1);
assert_int_equal(DatumGetPointer(heap_tuple_slot->base.tts_values[1]), COLUMN_INT4_VALUE);
cleanup_heap_tuple_slot(heap_tuple_slot);
}
static void
verify_chunk_tuple_list(TupleChunkListItem tc_item)
{
char tc_item_raw_verify_buff[TUPLE_CHUNK_RAW_BUFFER_LEN];
assert_true(tc_item);
assert_true(tc_item->p_next);
assert_false(tc_item->p_next->p_next);
verify_chunk_tuple_list_serialized_tuple(tc_item);
generate_seq_buffer(tc_item_raw_verify_buff, TUPLE_CHUNK_RAW_BUFFER_LEN);
assert_true(verify_chunk_list_raw_data(tc_item->p_next, tc_item_raw_verify_buff, TUPLE_CHUNK_RAW_BUFFER_LEN));
}
static TupleChunkListData *
build_chunk_tuple_slot(EState *estate, bool direct)
{
TupleChunkListData *tc_list;
TupleTableSlot *slot;
int sent = 0;
TupleChunkListItem item;
int32 raw_buffer_len = 0;
if (direct)
{
SerTupInfo tup_info;
struct directTransportBuffer direct_buffer;
tc_list = (TupleChunkListData *) palloc0(sizeof(TupleChunkListData));
slot = prepare_tuple_slot();
tup_info.tupdesc = slot->tts_tupleDescriptor;
CurrentMotionIPCLayer->GetTransportDirectBuffer(estate->interconnect_context, 1, 0, &direct_buffer);
assert_int_not_equal(direct_buffer.pri, 0);
assert_int_not_equal(direct_buffer.prilen, 0);
sent = SerializeTuple(slot, &tup_info, &direct_buffer, tc_list, 0);
assert_int_not_equal(sent, 0);
CurrentMotionIPCLayer->PutTransportDirectBuffer(estate->interconnect_context, 1, 0, sent);
cleanup_tuple_slot(slot);
}
else
{
/* Don't use InitSerTupInfo */
/* cause can't call SerializeTuple with system cache */
/*
* just build tc_list without call SerializeTuple when `direct` is
* false
*/
char tc_list_raw_buffer[TUPLE_CHUNK_RAW_BUFFER_LEN];
generate_seq_buffer(tc_list_raw_buffer, TUPLE_CHUNK_RAW_BUFFER_LEN);
tc_list = prepare_chunk_list_raw_data(tc_list_raw_buffer, TUPLE_CHUNK_RAW_BUFFER_LEN);
}
return tc_list;
}
static EState *
client_side_setup(MotionIPCLayer * motion_ipc_layer, int stc[2], int cts[2])
{
int32 listen_port = 0;
int32 server_listen_port = 0;
pid_t c_pid = 0;
pid_t server_pid = 0;
int8 already_setup = 1;
EState *estate;
client_side_global_var_init(motion_ipc_layer, &client_ic_proxy_pid);
CurrentMotionIPCLayer->InitMotionLayerIPC();
listen_port = CurrentMotionIPCLayer->GetListenPort();
assert_int_not_equal(listen_port, 0);
write_data_to_pipe(cts, listen_port, int32);
read_data_from_pipe(stc, &server_listen_port, int32);
c_pid = getpid();
write_data_to_pipe(cts, c_pid, pid_t);
read_data_from_pipe(stc, &server_pid, pid_t);
estate = prepare_estate( /* local_slice */ 1, server_listen_port,
listen_port,
server_pid,
c_pid);
CurrentMotionIPCLayer->SetupInterconnect(estate);
assert_true(estate->es_interconnect_is_setup);
assert_true(estate->interconnect_context != NULL);
write_data_to_pipe(cts, already_setup, int8);
return estate;
}
static void
forked_client(void **state, int stc[2], int cts[2], MotionIPCLayer * motion_ipc_layer)
{
EState *estate;
TupleChunkListData *tc_list1;
TupleChunkListData *tc_list2;
bool has_error = false;
assert_false(CurrentMotionIPCLayer);
estate = client_side_setup(motion_ipc_layer, stc, cts);
assert_true(estate);
assert_true(CurrentMotionIPCLayer);
tc_list1 = build_chunk_tuple_slot(estate, true);
tc_list2 = build_chunk_tuple_slot(estate, false);
CurrentMotionIPCLayer->SendTupleChunkToAMS(estate->interconnect_context, 1, 0, tc_list1->p_first);
CurrentMotionIPCLayer->SendEOS(estate->interconnect_context, 1, tc_list2->p_first);
CurrentMotionIPCLayer->TeardownInterconnect(estate->interconnect_context, &has_error);
assert_false(has_error);
CurrentMotionIPCLayer->CleanUpMotionLayerIPC();
shutdown_ic_proxy_if_need(client_ic_proxy_pid);
cleanup_chunk_list_raw_data(tc_list1);
cleanup_chunk_list_raw_data(tc_list2);
cleanup_estate(estate);
}
static EState *
server_side_setup(MotionIPCLayer * motion_ipc_layer, int stc[2], int cts[2])
{
int32 listen_port = 0;
int32 client_listen_port = 0;
pid_t c_pid = 0;
pid_t client_pid = 0;
int8 already_setup = 1;
EState *estate;
server_side_global_var_init(motion_ipc_layer, &server_ic_proxy_pid);
CurrentMotionIPCLayer->InitMotionLayerIPC();
listen_port = CurrentMotionIPCLayer->GetListenPort();
assert_int_not_equal(listen_port, 0);
read_data_from_pipe(cts, &client_listen_port, int32);
write_data_to_pipe(stc, listen_port, int32);
c_pid = getpid();
read_data_from_pipe(cts, &client_pid, pid_t);
write_data_to_pipe(stc, c_pid, pid_t);
estate = prepare_estate( /* local_slice */ 0, listen_port,
client_listen_port,
c_pid,
client_pid);
CurrentMotionIPCLayer->SetupInterconnect(estate);
assert_true(estate->es_interconnect_is_setup);
assert_true(estate->interconnect_context != NULL);
/* waiting for client setup */
read_data_from_pipe(cts, &already_setup, int8);
assert_int_equal(1, already_setup);
return estate;
}
static void
main_server(void **state, int stc[2], int cts[2], MotionIPCLayer * motion_ipc_layer)
{
int16 src_route = 0;
TupleChunkListItem tc_item;
EState *estate;
bool has_error = false;
assert_false(CurrentMotionIPCLayer);
estate = server_side_setup(motion_ipc_layer, stc, cts);
assert_true(estate);
assert_true(CurrentMotionIPCLayer);
tc_item = CurrentMotionIPCLayer->RecvTupleChunkFromAny(estate->interconnect_context, 1, &src_route);
verify_chunk_tuple_list(tc_item);
CurrentMotionIPCLayer->TeardownInterconnect(estate->interconnect_context, &has_error);
assert_false(has_error);
CurrentMotionIPCLayer->CleanUpMotionLayerIPC();
shutdown_ic_proxy_if_need(server_ic_proxy_pid);
pfree(tc_item);
cleanup_estate(estate);
}
static void
test_ic_interface(void **state, MotionIPCLayer * motion_ipc_layer)
{
pid_t f_pid;
int stc[2],
cts[2];
assert_false(CurrentMotionIPCLayer);
assert_int_equal(pipe(stc), 0);
assert_int_equal(pipe(cts), 0);
f_pid = fork();
assert_true(f_pid >= 0);
if (f_pid == 0)
{
forked_client(state, stc, cts, motion_ipc_layer);
close(stc[0]);
close(stc[1]);
close(cts[0]);
close(cts[1]);
MemoryContextReset(testMemoryContext);
testMemoryContext = NULL;
/*
* client side should not return assert can be catch in forked process
*/
exit(0);
}
main_server(state, stc, cts, motion_ipc_layer);
wait(NULL);
close(stc[0]);
close(stc[1]);
close(cts[0]);
close(cts[1]);
}
static void
test_ic_interface_tcp(void **state)
{
test_ic_interface(state, &tcp_ipc_layer);
}
static void
test_ic_interface_udpifc(void **state)
{
test_ic_interface(state, &udpifc_ipc_layer);
}
static void
test_ic_interface_proxy(void **state)
{
if (!proxy_ipc_layer.IcProxyServiceMain)
{
printf("enable-ic-proxy not set. skiped.\n");
return;
}
test_ic_interface(state, &proxy_ipc_layer);
}
static void
setup_data_structures(void **state)
{
if (NULL == TopMemoryContext)
{
assert_true(NULL == testMemoryContext);
MemoryContextInit();
testMemoryContext = AllocSetContextCreate(TopMemoryContext,
"Test Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContextSwitchTo(testMemoryContext);
}
}
static void
teardown_data_structures(void **state)
{
MemoryContextReset(testMemoryContext);
testMemoryContext = NULL;
TopMemoryContext = NULL;
CurrentMotionIPCLayer = NULL;
}
int
main(int argc, char *argv[])
{
progname = get_progname(argv[0]);
cmockery_parse_arguments(argc, argv);
const UnitTest tests[] = {
unit_test_setup_teardown(test_ic_interface_tcp, setup_data_structures, teardown_data_structures),
unit_test_setup_teardown(test_ic_interface_udpifc, setup_data_structures, teardown_data_structures),
unit_test_setup_teardown(test_ic_interface_proxy, setup_data_structures, teardown_data_structures)
};
return run_tests(tests);
return 0;
}