blob: c651e3a0aa109c8753fd1940416a28bd623fa6db [file] [log] [blame]
#include <gtest/gtest.h>
#include <pthread.h>
#include "butil/containers/mpsc_queue.h"
namespace {
const uint MAX_COUNT = 1000000;
void Consume(butil::MPSCQueue<uint>& q, bool allow_empty) {
uint i = 0;
uint empty_count = 0;
while (true) {
uint d;
if (!q.Dequeue(d)) {
ASSERT_TRUE(allow_empty);
ASSERT_LT(empty_count++, (const uint)10000);
::usleep(10 * 1000);
continue;
}
ASSERT_EQ(i++, d);
if (i == MAX_COUNT) {
break;
}
}
}
void* ProduceThread(void* arg) {
auto q = (butil::MPSCQueue<uint>*)arg;
for (uint i = 0; i < MAX_COUNT; ++i) {
q->Enqueue(i);
}
return NULL;
}
void* ConsumeThread1(void* arg) {
auto q = (butil::MPSCQueue<uint>*)arg;
Consume(*q, true);
return NULL;
}
TEST(MPSCQueueTest, spsc_single_thread) {
butil::MPSCQueue<uint> q;
for (uint i = 0; i < MAX_COUNT; ++i) {
q.Enqueue(i);
}
Consume(q, false);
}
TEST(MPSCQueueTest, spsc_multi_thread) {
butil::MPSCQueue<uint> q;
pthread_t produce_tid;
ASSERT_EQ(0, pthread_create(&produce_tid, NULL, ProduceThread, &q));
pthread_t consume_tid;
ASSERT_EQ(0, pthread_create(&consume_tid, NULL, ConsumeThread1, &q));
pthread_join(produce_tid, NULL);
pthread_join(consume_tid, NULL);
}
butil::atomic<uint> g_index(0);
void* MultiProduceThread(void* arg) {
auto q = (butil::MPSCQueue<uint>*)arg;
while (true) {
uint i = g_index.fetch_add(1, butil::memory_order_relaxed);
if (i >= MAX_COUNT) {
break;
}
q->Enqueue(i);
}
return NULL;
}
butil::Mutex g_mutex;
bool g_counts[MAX_COUNT];
void Consume2(butil::MPSCQueue<uint>& q) {
uint empty_count = 0;
uint count = 0;
while (true) {
uint d;
if (!q.Dequeue(d)) {
ASSERT_LT(empty_count++, (const uint)10000);
::usleep(1 * 1000);
continue;
}
ASSERT_LT(d, MAX_COUNT);
{
BAIDU_SCOPED_LOCK(g_mutex);
ASSERT_FALSE(g_counts[d]);
g_counts[d] = true;
}
if (++count >= MAX_COUNT) {
break;
}
}
}
void* ConsumeThread2(void* arg) {
auto q = (butil::MPSCQueue<uint>*)arg;
Consume2(*q);
return NULL;
}
TEST(MPSCQueueTest, mpsc_multi_thread) {
butil::MPSCQueue<uint> q;
int thread_num = 8;
pthread_t threads[thread_num];
for (int i = 0; i < thread_num; ++i) {
ASSERT_EQ(0, pthread_create(&threads[i], NULL, MultiProduceThread, &q));
}
pthread_t consume_tid;
ASSERT_EQ(0, pthread_create(&consume_tid, NULL, ConsumeThread2, &q));
for (int i = 0; i < thread_num; ++i) {
pthread_join(threads[i], NULL);
}
pthread_join(consume_tid, NULL);
}
struct MyObject {};
TEST(MPSCQueueTest, mpsc_test_allocator) {
butil::ObjectPoolAllocator<MyObject> alloc;
auto p = alloc.Alloc();
butil::ObjectPoolInfo info = butil::describe_objects<butil::MPSCQueueNode<MyObject>>();
ASSERT_EQ(1, info.item_num);
alloc.Free(p);
info = butil::describe_objects<butil::MPSCQueueNode<MyObject>>();
ASSERT_EQ(1, info.item_num);
p = alloc.Alloc();
info = butil::describe_objects<butil::MPSCQueueNode<MyObject>>();
ASSERT_EQ(1, info.item_num);
}
}