blob: 8fb810d9f0745b4cecfa8ce9626d39cf411cb180 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <bthread/execution_queue.h>
#include <bthread/sys_futex.h>
#include <bthread/countdown_event.h>
#include "butil/time.h"
#include "butil/fast_rand.h"
#include "butil/gperftools_profiler.h"
namespace {
bool stopped = false;
class ExecutionQueueTest : public testing::Test {
void SetUp() { stopped = false; }
void TearDown() {}
struct LongIntTask {
long value;
bthread::CountdownEvent* event;
LongIntTask(long v)
: value(v), event(NULL)
LongIntTask(long v, bthread::CountdownEvent* e)
: value(v), event(e)
LongIntTask() : value(0), event(NULL) {}
int add(void* meta, bthread::TaskIterator<LongIntTask> &iter) {
stopped = iter.is_queue_stopped();
int64_t* result = (int64_t*)meta;
for (; iter; ++iter) {
*result += iter->value;
if (iter->event) { iter->event->signal(); }
return 0;
TEST_F(ExecutionQueueTest, single_thread) {
int64_t result = 0;
int64_t expected_result = 0;
stopped = false;
bthread::ExecutionQueueId<LongIntTask> queue_id;
bthread::ExecutionQueueOptions options;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add, &result));
for (int i = 0; i < 100; ++i) {
expected_result += i;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
LOG(INFO) << "stop";
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_NE(0, bthread::execution_queue_execute(queue_id, 0));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(expected_result, result);
struct PushArg {
bthread::ExecutionQueueId<LongIntTask> id {0};
butil::atomic<int64_t> total_num {0};
butil::atomic<int64_t> total_time {0};
butil::atomic<int64_t> expected_value {0};
volatile bool stopped {false};
bool wait_task_completed {false};
void* push_thread(void *arg) {
PushArg* pa = (PushArg*)arg;
int64_t sum = 0;
butil::Timer timer;
int num = 0;
bthread::CountdownEvent e;
LongIntTask t(num, pa->wait_task_completed ? &e : NULL);
if (pa->wait_task_completed) {
while (bthread::execution_queue_execute(pa->id, t) == 0) {
sum += num;
t.value = ++num;
if (pa->wait_task_completed) {
pa->expected_value.fetch_add(sum, butil::memory_order_relaxed);
return NULL;
void* push_thread_which_addresses_execq(void *arg) {
PushArg* pa = (PushArg*)arg;
int64_t sum = 0;
butil::Timer timer;
int num = 0;
bthread::ExecutionQueue<LongIntTask>::scoped_ptr_t ptr
= bthread::execution_queue_address(pa->id);
while (ptr->execute(num) == 0) {
sum += num;
pa->expected_value.fetch_add(sum, butil::memory_order_relaxed);
return NULL;
TEST_F(ExecutionQueueTest, performance) {
pthread_t threads[8];
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
int64_t result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add, &result));
PushArg pa; = queue_id;
pa.total_num = 0;
pa.total_time = 0;
pa.expected_value = 0;
pa.stopped = false;
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_create(&threads[i], NULL, &push_thread_which_addresses_execq, &pa);
usleep(500 * 1000);
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_join(threads[i], NULL);
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(pa.expected_value.load(), result);
LOG(INFO) << "With addressed execq, each execution_queue_execute takes "
<< pa.total_time.load() / pa.total_num.load()
<< " total_num=" << pa.total_num
<< " ns with " << ARRAY_SIZE(threads) << " threads";
result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add, &result)); = queue_id;
pa.total_num = 0;
pa.total_time = 0;
pa.expected_value = 0;
pa.stopped = false;
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_create(&threads[i], NULL, &push_thread, &pa);
usleep(500 * 1000);
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_join(threads[i], NULL);
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(pa.expected_value.load(), result);
LOG(INFO) << "With id explicitly, execution_queue_execute takes "
<< pa.total_time.load() / pa.total_num.load()
<< " total_num=" << pa.total_num
<< " ns with " << ARRAY_SIZE(threads) << " threads";
volatile bool g_suspending = false;
volatile bool g_should_be_urgent = false;
int urgent_times = 0;
int add_with_suspend(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
int64_t* result = (int64_t*)meta;
if (iter.is_queue_stopped()) {
stopped = true;
return 0;
if (g_should_be_urgent) {
g_should_be_urgent = false;
EXPECT_EQ(-1, iter->value) << urgent_times;
if (iter->event) { iter->event->signal(); }
EXPECT_FALSE(iter) << urgent_times;
} else {
for (; iter; ++iter) {
if (iter->value == -100) {
g_suspending = true;
while (g_suspending) {
g_should_be_urgent = true;
if (iter->event) { iter->event->signal(); }
return 0;
} else {
*result += iter->value;
if (iter->event) { iter->event->signal(); }
return 0;
TEST_F(ExecutionQueueTest, execute_urgent) {
g_should_be_urgent = false;
pthread_t threads[10];
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
int64_t result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add_with_suspend, &result));
PushArg pa; = queue_id;
pa.total_num = 0;
pa.total_time = 0;
pa.expected_value = 0;
pa.stopped = false;
pa.wait_task_completed = true;
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_create(&threads[i], NULL, &push_thread, &pa);
g_suspending = false;
for (int i = 0; i < 100; ++i) {
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100));
while (!g_suspending) {
ASSERT_EQ(0, bthread::execution_queue_execute(
queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
g_suspending = false;
usleep(500* 1000);
pa.stopped = true;
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_join(threads[i], NULL);
LOG(INFO) << "result=" << result;
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(pa.expected_value.load(), result);
TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
g_should_be_urgent = false;
g_suspending = false;
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
int64_t result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add_with_suspend, &result));
g_suspending = false;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100));
while (!g_suspending) {
LOG(INFO) << "Going to push";
int64_t expected = 0;
for (int i = 1; i < 100; ++i) {
expected += i;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
g_suspending = false;
usleep(10 * 1000);
LOG(INFO) << "going to quit";
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(expected, result);
long next_task[1024];
butil::atomic<int> num_threads(0);
void* push_thread_with_id(void* arg) {
bthread::ExecutionQueueId<LongIntTask> id = { (uint64_t)arg };
int thread_id = num_threads.fetch_add(1, butil::memory_order_relaxed);
LOG(INFO) << "Start thread" << thread_id;
for (int i = 0; i < 100000; ++i) {
bthread::execution_queue_execute(id, ((long)thread_id << 32) | i);
return NULL;
int check_order(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
for (; iter; ++iter) {
long value = iter->value;
int thread_id = value >> 32;
long task = value & 0xFFFFFFFFul;
if (task != next_task[thread_id]++) {
EXPECT_TRUE(false) << "task=" << task << " thread_id=" << thread_id;
if (iter->event) { iter->event->signal(); }
return 0;
TEST_F(ExecutionQueueTest, multi_threaded_order) {
memset(next_task, 0, sizeof(next_task));
long disorder_times = 0;
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
check_order, &disorder_times));
pthread_t threads[12];
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_create(&threads[i], NULL, &push_thread_with_id, (void *)queue_id.value);
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_join(threads[i], NULL);
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(0, disorder_times);
int check_running_thread(void* arg, bthread::TaskIterator<LongIntTask>& iter) {
if (iter.is_queue_stopped()) {
return 0;
for (; iter; ++iter) {}
EXPECT_EQ(pthread_self(), (pthread_t)arg);
return 0;
TEST_F(ExecutionQueueTest, in_place_task) {
pthread_t thread_id = pthread_self();
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
ASSERT_EQ(0, bthread::execution_queue_execute(
queue_id, 0, &bthread::TASK_OPTIONS_INPLACE));
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
struct InPlaceTask {
bool first_task;
pthread_t thread_id;
void *run_first_tasks(void* arg) {
bthread::ExecutionQueueId<InPlaceTask> queue_id = { (uint64_t)arg };
InPlaceTask task;
task.first_task = true;
task.thread_id = pthread_self();
EXPECT_EQ(0, bthread::execution_queue_execute(queue_id, task,
return NULL;
int stuck_and_check_running_thread(void* arg, bthread::TaskIterator<InPlaceTask>& iter) {
if (iter.is_queue_stopped()) {
return 0;
butil::atomic<int>* futex = (butil::atomic<int>*)arg;
if (iter->first_task) {
EXPECT_EQ(pthread_self(), iter->thread_id);
bthread::futex_wake_private(futex, 1);
while (futex->load() != 2) {
bthread::futex_wait_private(futex, 1, NULL);
} else {
for (; iter; ++iter) {
EXPECT_NE(pthread_self(), iter->thread_id);
return 0;
TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
bthread::ExecutionQueueId<InPlaceTask> queue_id = { 0 };
bthread::ExecutionQueueOptions options;
butil::atomic<int> futex(0);
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
pthread_t thread;
ASSERT_EQ(0, pthread_create(&thread, NULL, run_first_tasks, (void*)queue_id.value));
while (futex.load() != 1) {
bthread::futex_wait_private(&futex, 0, NULL);
for (size_t i = 0; i < 100; ++i) {
InPlaceTask task;
task.first_task = false;
task.thread_id = pthread_self();
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, task,
bthread::futex_wake_private(&futex, 1);
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
void* inplace_push_thread(void* arg) {
bthread::ExecutionQueueId<LongIntTask> id = { (uint64_t)arg };
int thread_id = num_threads.fetch_add(1, butil::memory_order_relaxed);
LOG(INFO) << "Start thread" << thread_id;
for (int i = 0; i < 100000; ++i) {
bthread::execution_queue_execute(id, ((long)thread_id << 32) | i,
return NULL;
TEST_F(ExecutionQueueTest, inplace_and_order) {
memset(next_task, 0, sizeof(next_task));
long disorder_times = 0;
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
check_order, &disorder_times));
pthread_t threads[12];
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_create(&threads[i], NULL, &inplace_push_thread, (void *)queue_id.value);
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_join(threads[i], NULL);
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(0, disorder_times);
TEST_F(ExecutionQueueTest, size_of_task_node) {
LOG(INFO) << "sizeof(TaskNode)=" << sizeof(bthread::TaskNode);
int add_with_suspend2(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
int64_t* result = (int64_t*)meta;
if (iter.is_queue_stopped()) {
stopped = true;
return 0;
for (; iter; ++iter) {
if (iter->value == -100) {
g_suspending = true;
while (g_suspending) {
if (iter->event) { iter->event->signal(); }
} else {
*result += iter->value;
if (iter->event) { iter->event->signal(); }
return 0;
TEST_F(ExecutionQueueTest, cancel) {
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
int64_t result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add_with_suspend2, &result));
g_suspending = false;
bthread::TaskHandle handle0;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100, NULL, &handle0));
while (!g_suspending) {
ASSERT_EQ(1, bthread::execution_queue_cancel(handle0));
ASSERT_EQ(1, bthread::execution_queue_cancel(handle0));
bthread::TaskHandle handle1;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, 100, NULL, &handle1));
ASSERT_EQ(0, bthread::execution_queue_cancel(handle1));
g_suspending = false;
ASSERT_EQ(-1, bthread::execution_queue_cancel(handle1));
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(0, result);
struct CancelSelf {
butil::atomic<bthread::TaskHandle*> handle;
int cancel_self(void* /*meta*/, bthread::TaskIterator<CancelSelf*>& iter) {
for (; iter; ++iter) {
while ((*iter)->handle == NULL) {
EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load()));
EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load()));
EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load()));
return 0;
TEST_F(ExecutionQueueTest, cancel_self) {
bthread::ExecutionQueueId<CancelSelf*> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
cancel_self, NULL));
CancelSelf task;
task.handle = NULL;
bthread::TaskHandle handle;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, &task, NULL, &handle));;
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
struct AddTask {
int value;
bool cancel_task;
int cancel_value;
bthread::TaskHandle handle;
struct AddMeta {
int64_t sum;
butil::atomic<int64_t> expected;
butil::atomic<int64_t> succ_times;
butil::atomic<int64_t> race_times;
butil::atomic<int64_t> fail_times;
int add_with_cancel(void* meta, bthread::TaskIterator<AddTask>& iter) {
if (iter.is_queue_stopped()) {
return 0;
AddMeta* m = (AddMeta*)meta;
for (; iter; ++iter) {
if (iter->cancel_task) {
const int rc = bthread::execution_queue_cancel(iter->handle);
if (rc == 0) {
} else if (rc < 0) {
} else {
} else {
m->sum += iter->value;
return 0;
TEST_F(ExecutionQueueTest, random_cancel) {
bthread::ExecutionQueueId<AddTask> queue_id = { 0 };
AddMeta m;
m.sum = 0;;;;;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, NULL,
add_with_cancel, &m));
int64_t expected = 0;
for (int i = 0; i < 100000; ++i) {
bthread::TaskHandle h;
AddTask t;
t.value = i;
t.cancel_task = false;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, NULL, &h));
const int r = butil::fast_rand_less_than(4);
expected += i;
if (r == 0) {
if (bthread::execution_queue_cancel(h) == 0) {
expected -= i;
} else if (r == 1) {
t.cancel_task = true;
t.cancel_value = i;
t.handle = h;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, NULL));
} else if (r == 2) {
t.cancel_task = true;
t.cancel_value = i;
t.handle = h;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t,
} else {
// do nothing;
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(m.sum, m.expected.load());
LOG(INFO) << "sum=" << m.sum << " race_times=" << m.race_times
<< " succ_times=" << m.succ_times
<< " fail_times=" << m.fail_times;
int add2(void* meta, bthread::TaskIterator<LongIntTask> &iter) {
if (iter) {
int64_t* result = (int64_t*)meta;
*result += iter->value;
if (iter->event) { iter->event->signal(); }
return 0;
TEST_F(ExecutionQueueTest, not_do_iterate_at_all) {
int64_t result = 0;
int64_t expected_result = 0;
bthread::ExecutionQueueId<LongIntTask> queue_id;
bthread::ExecutionQueueOptions options;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add2, &result));
for (int i = 0; i < 100; ++i) {
expected_result += i;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_NE(0, bthread::execution_queue_execute(queue_id, 0));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(expected_result, result);
int add_with_suspend3(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
int64_t* result = (int64_t*)meta;
if (iter.is_queue_stopped()) {
stopped = true;
return 0;
for (; iter; ++iter) {
if (iter->value == -100) {
g_suspending = true;
while (g_suspending) {
if (iter->event) { iter->event->signal(); }
} else {
*result += iter->value;
if (iter->event) { iter->event->signal(); }
return 0;
TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
g_should_be_urgent = false;
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
int64_t result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add_with_suspend3, &result));
// Push a normal task to make the executor suspend
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100));
while (!g_suspending) {
// At this point, executor is suspended by the first task. Then we put
// a high_priority task which is going to be cancelled immediately,
// expecting that both operations are successful.
bthread::TaskHandle h;
ASSERT_EQ(0, bthread::execution_queue_execute(
queue_id, -100, &bthread::TASK_OPTIONS_URGENT, &h));
ASSERT_EQ(0, bthread::execution_queue_cancel(h));
// Resume executor
g_suspending = false;
// Push a normal task
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, 12345));
// The execq should stop normally
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(12345, result);
} // namespace