| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <stdlib.h> |
| #include <unistd.h> |
| #include <stdio.h> |
| #include <signal.h> |
| #include <gflags/gflags.h> |
| #include <gtest/gtest.h> |
| #include "butil/compat.h" |
| #include "butil/time.h" |
| #include "butil/macros.h" |
| #include "butil/errno.h" |
| #include <bthread/sys_futex.h> |
| #include <bthread/butex.h> |
| #include "bthread/bthread.h" |
| #include "butil/atomicops.h" |
| |
| namespace { |
| DEFINE_int32(thread_num, 1, "#pairs of threads doing ping pong"); |
| DEFINE_bool(loop, false, "run until ctrl-C is pressed"); |
| DEFINE_bool(use_futex, false, "use futex instead of pipe"); |
| DEFINE_bool(use_butex, false, "use butex instead of pipe"); |
| |
| void ALLOW_UNUSED (*ignore_sigpipe)(int) = signal(SIGPIPE, SIG_IGN); |
| |
| volatile bool stop = false; |
| void quit_handler(int) { |
| stop = true; |
| } |
| |
| struct BAIDU_CACHELINE_ALIGNMENT AlignedIntWrapper { |
| int value; |
| }; |
| |
| struct BAIDU_CACHELINE_ALIGNMENT PlayerArg { |
| int read_fd; |
| int write_fd; |
| int* wait_addr; |
| int* wake_addr; |
| long counter; |
| long wakeup; |
| }; |
| |
| void* pipe_player(void* void_arg) { |
| PlayerArg* arg = static_cast<PlayerArg*>(void_arg); |
| char dummy = '\0'; |
| while (1) { |
| ssize_t nr = read(arg->read_fd, &dummy, 1); |
| if (nr <= 0) { |
| if (nr == 0) { |
| printf("[%" PRIu64 "] EOF\n", pthread_numeric_id()); |
| break; |
| } |
| if (errno != EINTR) { |
| printf("[%" PRIu64 "] bad read, %m\n", pthread_numeric_id()); |
| break; |
| } |
| continue; |
| } |
| if (1L != write(arg->write_fd, &dummy, 1)) { |
| printf("[%" PRIu64 "] bad write, %m\n", pthread_numeric_id()); |
| break; |
| } |
| ++arg->counter; |
| } |
| return NULL; |
| } |
| |
| static const int INITIAL_FUTEX_VALUE = 0; |
| |
| void* futex_player(void* void_arg) { |
| PlayerArg* arg = static_cast<PlayerArg*>(void_arg); |
| int counter = INITIAL_FUTEX_VALUE; |
| while (!stop) { |
| int rc = bthread::futex_wait_private(arg->wait_addr, counter, NULL); |
| ++counter; |
| ++*arg->wake_addr; |
| bthread::futex_wake_private(arg->wake_addr, 1); |
| ++arg->counter; |
| arg->wakeup += (rc == 0); |
| } |
| return NULL; |
| } |
| |
| void* butex_player(void* void_arg) { |
| PlayerArg* arg = static_cast<PlayerArg*>(void_arg); |
| int counter = INITIAL_FUTEX_VALUE; |
| while (!stop) { |
| int rc = bthread::butex_wait(arg->wait_addr, counter, NULL); |
| ++counter; |
| ++*arg->wake_addr; |
| bthread::butex_wake(arg->wake_addr); |
| ++arg->counter; |
| arg->wakeup += (rc == 0); |
| } |
| return NULL; |
| } |
| |
| TEST(PingPongTest, ping_pong) { |
| signal(SIGINT, quit_handler); |
| stop = false; |
| PlayerArg* args[FLAGS_thread_num]; |
| |
| for (int i = 0; i < FLAGS_thread_num; ++i) { |
| int pipe1[2]; |
| int pipe2[2]; |
| if (!FLAGS_use_futex && !FLAGS_use_butex) { |
| ASSERT_EQ(0, pipe(pipe1)); |
| ASSERT_EQ(0, pipe(pipe2)); |
| } |
| |
| PlayerArg* arg1 = new PlayerArg; |
| if (!FLAGS_use_futex && !FLAGS_use_butex) { |
| arg1->read_fd = pipe1[0]; |
| arg1->write_fd = pipe2[1]; |
| } else if (FLAGS_use_futex) { |
| AlignedIntWrapper* w1 = new AlignedIntWrapper; |
| w1->value = INITIAL_FUTEX_VALUE; |
| AlignedIntWrapper* w2 = new AlignedIntWrapper; |
| w2->value = INITIAL_FUTEX_VALUE; |
| arg1->wait_addr = &w1->value; |
| arg1->wake_addr = &w2->value; |
| } else if (FLAGS_use_butex) { |
| arg1->wait_addr = bthread::butex_create_checked<int>(); |
| *arg1->wait_addr = INITIAL_FUTEX_VALUE; |
| arg1->wake_addr = bthread::butex_create_checked<int>(); |
| *arg1->wake_addr = INITIAL_FUTEX_VALUE; |
| } else { |
| ASSERT_TRUE(false); |
| } |
| arg1->counter = 0; |
| arg1->wakeup = 0; |
| args[i] = arg1; |
| |
| PlayerArg* arg2 = new PlayerArg; |
| if (!FLAGS_use_futex && !FLAGS_use_butex) { |
| arg2->read_fd = pipe2[0]; |
| arg2->write_fd = pipe1[1]; |
| } else { |
| arg2->wait_addr = arg1->wake_addr; |
| arg2->wake_addr = arg1->wait_addr; |
| } |
| arg2->counter = 0; |
| arg2->wakeup = 0; |
| |
| pthread_t th1, th2; |
| bthread_t bth1, bth2; |
| if (!FLAGS_use_futex && !FLAGS_use_butex) { |
| ASSERT_EQ(0, pthread_create(&th1, NULL, pipe_player, arg1)); |
| ASSERT_EQ(0, pthread_create(&th2, NULL, pipe_player, arg2)); |
| } else if (FLAGS_use_futex) { |
| ASSERT_EQ(0, pthread_create(&th1, NULL, futex_player, arg1)); |
| ASSERT_EQ(0, pthread_create(&th2, NULL, futex_player, arg2)); |
| } else if (FLAGS_use_butex) { |
| ASSERT_EQ(0, bthread_start_background(&bth1, NULL, butex_player, arg1)); |
| ASSERT_EQ(0, bthread_start_background(&bth2, NULL, butex_player, arg2)); |
| } else { |
| ASSERT_TRUE(false); |
| } |
| |
| if (!FLAGS_use_futex && !FLAGS_use_butex) { |
| // send the seed data. |
| unsigned char seed = 255; |
| ASSERT_EQ(1L, write(pipe1[1], &seed, 1)); |
| } else if (FLAGS_use_futex) { |
| ++*arg1->wait_addr; |
| bthread::futex_wake_private(arg1->wait_addr, 1); |
| } else if (FLAGS_use_butex) { |
| ++*arg1->wait_addr; |
| bthread::butex_wake(arg1->wait_addr); |
| } else { |
| ASSERT_TRUE(false); |
| } |
| } |
| |
| long last_counter = 0; |
| long last_wakeup = 0; |
| while (!stop) { |
| butil::Timer tm; |
| tm.start(); |
| sleep(1); |
| tm.stop(); |
| long cur_counter = 0; |
| long cur_wakeup = 0; |
| for (int i = 0; i < FLAGS_thread_num; ++i) { |
| cur_counter += args[i]->counter; |
| cur_wakeup += args[i]->wakeup; |
| } |
| if (FLAGS_use_futex || FLAGS_use_butex) { |
| printf("pingpong-ed %" PRId64 "/s, wakeup=%" PRId64 "/s\n", |
| (cur_counter - last_counter) * 1000L / tm.m_elapsed(), |
| (cur_wakeup - last_wakeup) * 1000L / tm.m_elapsed()); |
| } else { |
| printf("pingpong-ed %" PRId64 "/s\n", |
| (cur_counter - last_counter) * 1000L / tm.m_elapsed()); |
| } |
| last_counter = cur_counter; |
| last_wakeup = cur_wakeup; |
| if (!FLAGS_loop) { |
| break; |
| } |
| } |
| stop = true; |
| // Program quits, Let resource leak. |
| } |
| } // namespace |