| #include <assert.h> |
| #include <unistd.h> |
| #include <sys/wait.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <signal.h> |
| #include <getopt.h> |
| |
| #include "postgres.h" |
| #include "ic_modules.h" |
| #include "ic_internal.h" |
| #include "postmaster/postmaster.h" |
| #include "utils/memutils.h" |
| #include "utils/syscache.h" |
| #include "storage/latch.h" |
| #include "ic_test_env.h" |
| |
| #define MTU_LESS_LEN (sizeof(struct icpkthdr) + TUPLE_CHUNK_HEADER_SIZE) + 1 |
| |
| const char *progname = NULL; |
| static MemoryContext testMemoryContext = NULL; |
| |
| volatile bool interrupt_flag = false; |
| bool am_client_side = false; |
| pid_t server_side_pid = -1; |
| pid_t client_side_pid = -1; |
| |
| pid_t server_ic_proxy_pid = -1; |
| pid_t client_ic_proxy_pid = -1; |
| |
| static struct option long_options[] = { |
| {"help", no_argument, NULL, '?'}, |
| {"type", required_argument, NULL, 't'}, |
| {"interval", required_argument, NULL, 'i'}, |
| {"verify", optional_argument, NULL, 'v'}, |
| {"mtu", required_argument, NULL, 'm'}, |
| {"direct", required_argument, NULL, 'd'}, |
| {NULL, 0, NULL, 0} |
| }; |
| |
| struct bench_options |
| { |
| int ic_type; |
| int interval; |
| bool should_verify; |
| /* in fact, mtu is currently only valid for the client side */ |
| int mtu; |
| int bsize; |
| bool direct_buffer; |
| |
| MotionIPCLayer *ipc_layer; |
| }; |
| |
| static inline void |
| usage() |
| { |
| printf("%s - Interconntect benchmark \n\n", progname); |
| printf("Usage:\n %s [OPTION]...\n\n", progname); |
| printf("Options:\n"); |
| |
| printf(" -t, --type <ic type> Specify the interconnection type to run benchmark\n" |
| " The value range is [0-2]:\n" |
| " 0 - tcp\n" |
| " 1 - udpifc\n" |
| " 2 - proxy\n" |
| " default tcp(0)\n"); |
| printf(" -i, --interval <second> The duration of the benchmark. default is \"60s\"\n"); |
| printf(" -v, --verify Verify the result in recv side. default is \"false\"\n"); |
| printf(" -m, --mtu The MTU setting. default is \"1500\"\n"); |
| printf(" -b, --bsize The each buffer send size. default is \"200\"\n"); |
| printf(" -d, --direct Use direct buffer in sender. default is \"false\"\n"); |
| } |
| |
| static void |
| init_memory_context() |
| { |
| if (NULL == TopMemoryContext) |
| { |
| assert(NULL == testMemoryContext); |
| MemoryContextInit(); |
| |
| testMemoryContext = AllocSetContextCreate(TopMemoryContext, |
| "Test Context", |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE); |
| |
| MemoryContextSwitchTo(testMemoryContext); |
| } |
| } |
| |
| static void |
| destroy_memory_context() |
| { |
| MemoryContextReset(testMemoryContext); |
| testMemoryContext = NULL; |
| TopMemoryContext = NULL; |
| CurrentMotionIPCLayer = NULL; |
| } |
| |
| static EState * |
| client_side_setup(const struct bench_options *options, int stc[2], int cts[2]) |
| { |
| int32 listen_port = 0; |
| int32 server_listen_port = 0; |
| pid_t c_pid = 0; |
| int8 already_setup = 1; |
| EState *estate; |
| |
| am_client_side = true; |
| client_side_global_var_init(options->ipc_layer, &client_ic_proxy_pid); |
| Gp_max_packet_size = options->mtu; |
| |
| CurrentMotionIPCLayer->InitMotionLayerIPC(); |
| |
| listen_port = CurrentMotionIPCLayer->GetListenPort(); |
| if (listen_port == 0) |
| { |
| printf("failed to init motion layer ipc."); |
| return NULL; |
| } |
| |
| write_data_to_pipe(cts, listen_port, int32); |
| read_data_from_pipe(stc, &server_listen_port, int32); |
| |
| c_pid = getpid(); |
| write_data_to_pipe(cts, c_pid, pid_t); |
| read_data_from_pipe(stc, &server_side_pid, pid_t); |
| |
| estate = prepare_estate( /* local_slice */ 1, server_listen_port, |
| listen_port, |
| server_side_pid, |
| c_pid); |
| |
| CurrentMotionIPCLayer->SetupInterconnect(estate); |
| if (!estate->es_interconnect_is_setup || !estate->interconnect_context) |
| { |
| cleanup_estate(estate); |
| printf("failed to setup motion layer ipc."); |
| return NULL; |
| } |
| |
| write_data_to_pipe(cts, already_setup, int8); |
| return estate; |
| } |
| |
| EState * |
| server_side_setup(const struct bench_options *options, |
| int stc[2], |
| int cts[2]) |
| { |
| int32 listen_port = 0; |
| int32 client_listen_port = 0; |
| int8 already_setup = 0; |
| pid_t c_pid = 0; |
| EState *estate; |
| |
| am_client_side = false; |
| server_side_global_var_init(options->ipc_layer, &server_ic_proxy_pid); |
| Gp_max_packet_size = options->mtu; |
| |
| CurrentMotionIPCLayer->InitMotionLayerIPC(); |
| |
| listen_port = CurrentMotionIPCLayer->GetListenPort(); |
| if (listen_port == 0) |
| { |
| printf("failed to init motion layer ipc."); |
| return NULL; |
| } |
| |
| read_data_from_pipe(cts, &client_listen_port, int32); |
| write_data_to_pipe(stc, listen_port, int32); |
| |
| c_pid = getpid(); |
| read_data_from_pipe(cts, &client_side_pid, pid_t); |
| write_data_to_pipe(stc, c_pid, pid_t); |
| |
| estate = prepare_estate( /* local_slice */ 0, |
| listen_port, |
| client_listen_port, |
| c_pid, |
| client_side_pid); |
| |
| CurrentMotionIPCLayer->SetupInterconnect(estate); |
| if (!estate->es_interconnect_is_setup || !estate->interconnect_context) |
| { |
| cleanup_estate(estate); |
| printf("failed to setup motion layer ipc."); |
| return NULL; |
| } |
| |
| /* waiting for client setup */ |
| read_data_from_pipe(cts, &already_setup, int8); |
| if (already_setup != 1) |
| { |
| printf("failed to recv client setup signal"); |
| cleanup_estate(estate); |
| return NULL; |
| } |
| |
| return estate; |
| } |
| |
| void |
| sig_handler(int sig_num) |
| { |
| switch (sig_num) |
| { |
| case SIGALRM: |
| case SIGUSR1: |
| { |
| interrupt_flag = true; |
| break; |
| } |
| default: |
| { |
| /* do nothing */ |
| break; |
| } |
| } |
| } |
| |
| void |
| sig_stop() |
| { |
| interrupt_flag = true; |
| if (am_client_side) |
| { |
| assert(server_side_pid != -1); |
| kill(server_side_pid, SIGUSR1); |
| } |
| else |
| { |
| assert(client_side_pid != -1); |
| kill(client_side_pid, SIGUSR1); |
| } |
| } |
| |
| static TupleChunkListData * |
| build_chunk_tuple_slot(EState *estate, size_t size) |
| { |
| TupleChunkListData *tc_list; |
| char tc_list_raw_buffer[size]; |
| |
| generate_seq_buffer(tc_list_raw_buffer, size); |
| tc_list = prepare_chunk_list_raw_data(tc_list_raw_buffer, size); |
| |
| return tc_list; |
| } |
| |
| static bool |
| measure_chunk_tuple_list(char *verify_buffer, int verify_buff_len, TupleChunkListItem tc_item, |
| uint64 *total_recv_size, uint64 *total_recv_chunk_item_counts) |
| { |
| |
| TupleChunkListItem p_curr = tc_item; |
| TupleChunkListItem p_last; |
| |
| if (!p_curr) |
| { |
| printf("recv got empty TupleChunkListItem.\n"); |
| return false; |
| } |
| |
| while (p_curr) |
| { |
| if (verify_buffer && verify_buff_len != 0 && !verify_chunk_list_raw_data(p_curr, verify_buffer, verify_buff_len)) |
| { |
| printf("recv TupleChunkListItem not matched.\n"); |
| return false; |
| } |
| |
| *total_recv_size = *total_recv_size + p_curr->chunk_length; |
| (*total_recv_chunk_item_counts)++; |
| p_last = p_curr; |
| p_curr = p_curr->p_next; |
| pfree(p_last); |
| } |
| |
| return true; |
| } |
| |
| static void |
| print_direct_mode_summary(const struct bench_options *options, |
| const uint64 direct_hit, |
| const uint64 non_direct_hit) |
| { |
| char pbuff[1024 * 100]; |
| int n = 0; |
| |
| setbuf(stdout, NULL); |
| n = sprintf(pbuff, "+----------------+------------+\n"); |
| n += sprintf(pbuff + n, "| %-14s | %10ld |\n", "Direct hits", direct_hit); |
| n += sprintf(pbuff + n, "| %-14s | %10ld |\n", "Ndirect hits", non_direct_hit); |
| /* non-direct hits/direct hits */ |
| n += sprintf(pbuff + n, "| %-14s | %10.4f |\n", "n/d hits rate", direct_hit == 0 |
| ? 0 : (double) non_direct_hit / direct_hit); |
| /* buffer size/mtu */ |
| n += sprintf(pbuff + n, "| %-14s | %10.4f |\n", "b/m ratio", (double) options->bsize / options->mtu); |
| sprintf(pbuff + n, "+----------------+------------+\n"); |
| |
| printf("%s", pbuff); |
| } |
| |
| |
| void |
| client_loop(const struct bench_options *options, int stc[2], int cts[2]) |
| { |
| EState *estate; |
| struct itimerval timeout_val; |
| bool has_error = false; |
| TupleChunkListData *tc_list_raw_buffer; |
| struct directTransportBuffer direct_buffer; |
| int8 already_stop = 0; |
| |
| uint64 direct_hit = 0; |
| uint64 non_direct_hit = 0; |
| |
| init_memory_context(); |
| estate = client_side_setup(options, stc, cts); |
| if (!estate) |
| { |
| /* can not use signal to notify server side now */ |
| printf("client side setup failed.\n"); |
| return; |
| } |
| |
| signal(SIGALRM, sig_handler); |
| signal(SIGUSR1, sig_handler); |
| |
| tc_list_raw_buffer = build_chunk_tuple_slot(estate, options->bsize); |
| |
| timeout_val.it_value.tv_sec = options->interval; |
| timeout_val.it_value.tv_usec = 0; |
| timeout_val.it_interval.tv_sec = 0; |
| timeout_val.it_interval.tv_usec = 0; |
| setitimer(ITIMER_REAL, &timeout_val, NULL); |
| |
| while (true) |
| { |
| if (interrupt_flag) |
| { |
| kill(server_side_pid, SIGALRM); |
| int n = 0; |
| |
| if (fcntl(stc[0], F_SETFL, fcntl(stc[0], F_GETFL) | O_NONBLOCK) != 0) |
| { |
| printf("client side exit failed.\n"); |
| return; |
| } |
| |
| /* waiting for server side stuck in recv or break. */ |
| sleep(0.1); |
| while ((n = read(stc[0], &already_stop, sizeof(int8))) < 0) |
| { |
| if (errno == EAGAIN || errno == EWOULDBLOCK) |
| { |
| CurrentMotionIPCLayer->SendTupleChunkToAMS(estate->interconnect_context, 1, 0, tc_list_raw_buffer->p_first); |
| continue; |
| } |
| else |
| { |
| printf("client side read signal failed failed.\n"); |
| } |
| } |
| break; |
| } |
| |
| if (options->direct_buffer) |
| { |
| CurrentMotionIPCLayer->GetTransportDirectBuffer(estate->interconnect_context, 1, 0, &direct_buffer); |
| if (direct_buffer.prilen < tc_list_raw_buffer->p_first->chunk_length) |
| { |
| non_direct_hit++; |
| CurrentMotionIPCLayer->SendTupleChunkToAMS(estate->interconnect_context, 1, 0, tc_list_raw_buffer->p_first); |
| } |
| else |
| { |
| direct_hit++; |
| memcpy(direct_buffer.pri, tc_list_raw_buffer->p_first->chunk_data, tc_list_raw_buffer->p_first->chunk_length); |
| CurrentMotionIPCLayer->PutTransportDirectBuffer(estate->interconnect_context, 1, 0, tc_list_raw_buffer->p_first->chunk_length); |
| } |
| } |
| else |
| { |
| CurrentMotionIPCLayer->SendTupleChunkToAMS(estate->interconnect_context, 1, 0, tc_list_raw_buffer->p_first); |
| } |
| } |
| |
| if (options->direct_buffer) |
| { |
| print_direct_mode_summary(options, direct_hit, non_direct_hit); |
| } |
| |
| |
| CurrentMotionIPCLayer->TeardownInterconnect(estate->interconnect_context, &has_error); |
| assert(!has_error); |
| |
| CurrentMotionIPCLayer->CleanUpMotionLayerIPC(); |
| |
| shutdown_ic_proxy_if_need(client_ic_proxy_pid); |
| |
| cleanup_estate(estate); |
| destroy_memory_context(); |
| } |
| |
| static void |
| print_summary(const double elapsed_time, const uint32 loop_times, |
| const uint64 total_recv_size, const uint64 total_recv_chunk_item_counts) |
| { |
| char pbuff[1024 * 100]; |
| int n = 0; |
| |
| setbuf(stdout, NULL); |
| n = sprintf(pbuff, "+----------------+------------+\n"); |
| n += sprintf(pbuff + n, "| %-14s | %10.3f |\n", "Total time(s)", elapsed_time / 1000); |
| n += sprintf(pbuff + n, "| %-14s | %10ld |\n", "Loop times", loop_times); |
| n += sprintf(pbuff + n, "| %-14s | %10.3f |\n", "LPS(l/ms)", (double) (loop_times / elapsed_time)); |
| n += sprintf(pbuff + n, "| %-14s | %10ld |\n", "Recv mbs", total_recv_size / 1024 / 1024); |
| n += sprintf(pbuff + n, "| %-14s | %10.3f |\n", "TPS(mb/s)", (double) (total_recv_size / (elapsed_time / 1000) / 1024 / 1024)); |
| n += sprintf(pbuff + n, "| %-14s | %10ld |\n", "Recv counts", total_recv_chunk_item_counts); |
| n += sprintf(pbuff + n, "| %-14s | %10.3f |\n", "Items ops/ms", (double) (total_recv_chunk_item_counts / (elapsed_time))); |
| sprintf(pbuff + n, "+----------------+------------+\n"); |
| printf("%s", pbuff); |
| } |
| |
| void |
| server_loop(const struct bench_options *options, int stc[2], int cts[2]) |
| { |
| EState *estate; |
| struct timeval start_time, |
| end_time; |
| double elapsed_time; |
| TupleChunkListItem tc_item; |
| bool has_error = false; |
| uint32 loop_times = 0; |
| uint64 total_recv_size = 0; |
| uint64 total_recv_chunk_item_counts = 0; |
| char *tc_item_raw_verify_buff; |
| int tc_item_raw_verify_buff_len = 0; |
| int8 already_stop = 1; |
| |
| init_memory_context(); |
| estate = server_side_setup(options, stc, cts); |
| if (!estate) |
| { |
| /* can not use signal to notify client side now */ |
| printf("server side setup failed.\n"); |
| return; |
| } |
| |
| signal(SIGALRM, sig_handler); |
| signal(SIGUSR1, sig_handler); |
| |
| if (options->should_verify) |
| { |
| tc_item_raw_verify_buff_len = options->bsize; |
| tc_item_raw_verify_buff = palloc(tc_item_raw_verify_buff_len); |
| generate_seq_buffer(tc_item_raw_verify_buff, tc_item_raw_verify_buff_len); |
| } |
| |
| gettimeofday(&start_time, NULL); |
| while (true) |
| { |
| if (interrupt_flag) |
| { |
| write_data_to_pipe(stc, already_stop, int8); |
| break; |
| } |
| |
| tc_item = CurrentMotionIPCLayer->RecvTupleChunkFrom(estate->interconnect_context, 1, 0); |
| if (!measure_chunk_tuple_list(tc_item_raw_verify_buff, |
| tc_item_raw_verify_buff_len, |
| tc_item, |
| &total_recv_size, |
| &total_recv_chunk_item_counts)) |
| { |
| sig_stop(); |
| } |
| |
| if (CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_UDPIFC) |
| { |
| CurrentMotionIPCLayer->DirectPutRxBuffer(estate->interconnect_context, 1, 0); |
| } |
| |
| loop_times++; |
| } |
| |
| gettimeofday(&end_time, NULL); |
| elapsed_time = (double) (end_time.tv_sec - start_time.tv_sec) * 1000 + |
| (double) (end_time.tv_usec - start_time.tv_usec) / 1000; |
| |
| print_summary(elapsed_time, loop_times, total_recv_size, total_recv_chunk_item_counts); |
| |
| if (options->should_verify) |
| { |
| pfree(tc_item_raw_verify_buff); |
| } |
| |
| CurrentMotionIPCLayer->TeardownInterconnect(estate->interconnect_context, &has_error); |
| assert(!has_error); |
| |
| CurrentMotionIPCLayer->CleanUpMotionLayerIPC(); |
| shutdown_ic_proxy_if_need(server_ic_proxy_pid); |
| |
| cleanup_estate(estate); |
| destroy_memory_context(); |
| } |
| |
| |
| int |
| main(int argc, char *argv[]) |
| { |
| int stc[2], |
| cts[2]; |
| pid_t f_pid; |
| int c; |
| |
| struct bench_options options = { |
| .ic_type = 0, |
| .interval = 60, |
| .should_verify = false, |
| /* the default MTU is 8192 in GUC |
| * but in a production environment, DBA will generally set it to 1500 |
| */ |
| .mtu = 1500, |
| .bsize = TUPLE_CHUNK_RAW_BUFFER_LEN, |
| .ipc_layer = NULL, |
| .direct_buffer = false |
| }; |
| |
| progname = get_progname(argv[0]); |
| if (argc > 1) |
| { |
| if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0 |
| || strcmp(argv[1], "-h") == 0) |
| { |
| usage(); |
| exit(0); |
| } |
| } |
| |
| optind = 1; |
| while (optind < argc) |
| { |
| while ((c = getopt_long(argc, argv, "t:i:vm:b:d", |
| long_options, NULL)) != -1) |
| { |
| switch (c) |
| { |
| case 't': |
| { |
| char *type_c; |
| |
| type_c = strdup(optarg); |
| options.ic_type = atoi(type_c); |
| free(type_c); |
| break; |
| } |
| case 'i': |
| { |
| char *interval_c; |
| |
| interval_c = strdup(optarg); |
| options.interval = atoi(interval_c); |
| free(interval_c); |
| break; |
| } |
| case 'v': |
| { |
| options.should_verify = true; |
| break; |
| } |
| case 'm': |
| { |
| char *mtu_c; |
| |
| mtu_c = strdup(optarg); |
| options.mtu = atoi(mtu_c); |
| free(mtu_c); |
| break; |
| } |
| case 'b': |
| { |
| char *buffer_size_c; |
| |
| buffer_size_c = strdup(optarg); |
| options.bsize = atoi(buffer_size_c); |
| free(buffer_size_c); |
| break; |
| } |
| case 'd': |
| { |
| options.direct_buffer = true; |
| break; |
| } |
| default: |
| { |
| /* do nothing */ |
| break; |
| } |
| } |
| } |
| } |
| |
| switch (options.ic_type) |
| { |
| case INTERCONNECT_TYPE_TCP: |
| { |
| options.ipc_layer = &tcp_ipc_layer; |
| break; |
| } |
| case INTERCONNECT_TYPE_UDPIFC: |
| { |
| options.ipc_layer = &udpifc_ipc_layer; |
| break; |
| } |
| case INTERCONNECT_TYPE_PROXY: |
| { |
| options.ipc_layer = &proxy_ipc_layer; |
| break; |
| } |
| default: |
| { |
| printf("invalid of args -t/--type %d\n", options.ic_type); |
| usage(); |
| return -1; |
| } |
| } |
| |
| if (options.interval <= 1) |
| { |
| printf("invalid of args -i/--interval %d\n", options.interval); |
| usage(); |
| return -1; |
| } |
| |
| if (options.mtu < MTU_LESS_LEN) |
| { |
| printf("invalid of args -m/--mtu %d, should not less than \n", options.mtu, MTU_LESS_LEN); |
| usage(); |
| return -1; |
| } |
| |
| if (options.ic_type == INTERCONNECT_TYPE_PROXY && options.mtu != 1500) |
| { |
| printf("invalid of args -m/--mtu %d, proxy not allow setting mtu. \n", options.mtu); |
| usage(); |
| return -1; |
| } |
| |
| if (options.bsize <= 0 || options.bsize >= (options.mtu - MTU_LESS_LEN)) |
| { |
| printf("invalid of args -b/--bsize %d, should not bigger than (mtu - header).\n", options.bsize); |
| usage(); |
| return -1; |
| } |
| |
| if (pipe(stc) < 0) |
| { |
| printf("pipe created failed. errno: %d\n", errno); |
| return -1; |
| } |
| |
| if (pipe(cts) < 0) |
| { |
| printf("pipe created failed. errno: %d\n", errno); |
| close(stc[0]); |
| close(stc[1]); |
| return -1; |
| } |
| |
| f_pid = fork(); |
| |
| if (f_pid < 0) |
| { |
| printf("fork failed. errno: %d\n", errno); |
| close(stc[0]); |
| close(stc[1]); |
| close(cts[0]); |
| close(cts[1]); |
| return -1; |
| } |
| |
| if (f_pid == 0) |
| { |
| client_loop(&options, stc, cts); |
| close(stc[0]); |
| close(stc[1]); |
| close(cts[0]); |
| close(cts[1]); |
| } |
| else |
| { |
| server_loop(&options, stc, cts); |
| } |
| |
| wait(NULL); |
| close(stc[0]); |
| close(stc[1]); |
| close(cts[0]); |
| close(cts[1]); |
| return 0; |
| } |