blob: 54e47da4579bef9925d136ffd864929531894199 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#if BRPC_WITH_RDMA
#include <errno.h>
#include <stdlib.h>
#include <vector>
#include <gflags/gflags.h>
#include "butil/fast_rand.h"
#include "butil/iobuf.h"
#include "butil/object_pool.h"
#include "butil/thread_local.h"
#include "bthread/bthread.h"
#include "brpc/rdma/block_pool.h"
namespace brpc {
namespace rdma {
DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
"Initial size of memory pool for RDMA (MB)");
DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
"Increased size of memory pool for RDMA (MB)");
DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
static RegisterCallback g_cb = NULL;
// Number of bytes in 1MB
static const size_t BYTES_IN_MB = 1048576;
static const int BLOCK_DEFAULT = 0; // 8KB
static const int BLOCK_LARGE = 1; // 64KB
static const int BLOCK_HUGE = 2; // 2MB
static const int BLOCK_SIZE_COUNT = 3;
static size_t g_block_size[BLOCK_SIZE_COUNT] = { 8192, 65536, 2 * BYTES_IN_MB };
struct IdleNode {
void* start;
size_t len;
IdleNode* next;
};
struct Region {
Region() { start = 0; }
uintptr_t start;
size_t size;
uint32_t block_type;
uint32_t id; // lkey
};
static const int32_t RDMA_MEMORY_POOL_MIN_REGIONS = 1;
static const int32_t RDMA_MEMORY_POOL_MAX_REGIONS = 16;
static Region g_regions[RDMA_MEMORY_POOL_MAX_REGIONS];
static int g_region_num = 0;
static const int32_t RDMA_MEMORY_POOL_MIN_SIZE = 32; // 16MB
static const int32_t RDMA_MEMORY_POOL_MAX_SIZE = 1048576; // 1TB
static const int32_t RDMA_MEMORY_POOL_MIN_BUCKETS = 1;
static const int32_t RDMA_MEMORY_POOL_MAX_BUCKETS = 16;
static size_t g_buckets = 1;
static bool g_dump_enable = false;
static butil::Mutex* g_dump_mutex = NULL;
// Only for default block size
static __thread IdleNode* tls_idle_list = NULL;
static __thread size_t tls_idle_num = 0;
static __thread bool tls_inited = false;
static butil::Mutex* g_tls_info_mutex = NULL;
static size_t g_tls_info_cnt = 0;
static size_t* g_tls_info[1024];
// For each block size, there are some buckets of idle list to reduce race.
struct GlobalInfo {
std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
butil::Mutex extend_lock;
};
static GlobalInfo* g_info = NULL;
static inline Region* GetRegion(const void* buf) {
if (!buf) {
errno = EINVAL;
return NULL;
}
Region* r = NULL;
uintptr_t addr = (uintptr_t)buf;
for (int i = 0; i < FLAGS_rdma_memory_pool_max_regions; ++i) {
if (g_regions[i].start == 0) {
break;
}
if (addr >= g_regions[i].start &&
addr < g_regions[i].start + g_regions[i].size) {
r = &g_regions[i];
break;
}
}
return r;
}
uint32_t GetRegionId(const void* buf) {
Region* r = GetRegion(buf);
if (!r) {
return 0;
}
return r->id;
}
// Extend the block pool with a new region (with different region ID)
static void* ExtendBlockPool(size_t region_size, int block_type) {
if (region_size < 1) {
errno = EINVAL;
return NULL;
}
if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
LOG(INFO) << "Memory pool reaches max regions";
errno = ENOMEM;
return NULL;
}
// Regularize region size
region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
region_size *= g_block_size[block_type] * g_buckets;
LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << "MB";
void* region_base = NULL;
if (posix_memalign(&region_base, 4096, region_size) != 0) {
PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
return NULL;
}
uint32_t id = g_cb(region_base, region_size);
if (id == 0) {
free(region_base);
return NULL;
}
IdleNode* node[g_buckets];
for (size_t i = 0; i < g_buckets; ++i) {
node[i] = butil::get_object<IdleNode>();
if (!node[i]) {
PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
for (size_t j = 0; j < i; ++j) {
butil::return_object<IdleNode>(node[j]);
}
free(region_base);
return NULL;
}
}
Region* region = &g_regions[g_region_num++];
region->start = (uintptr_t)region_base;
region->size = region_size;
region->id = id;
region->block_type = block_type;
for (size_t i = 0; i < g_buckets; ++i) {
node[i]->start = (void*)(region->start + i * (region_size / g_buckets));
node[i]->len = region_size / g_buckets;
node[i]->next = NULL;
g_info->idle_list[block_type][i] = node[i];
g_info->idle_size[block_type][i] += node[i]->len;
}
return region_base;
}
void* InitBlockPool(RegisterCallback cb) {
if (!cb) {
errno = EINVAL;
return NULL;
}
if (g_cb) {
LOG(WARNING) << "Do not initialize block pool repeatedly";
errno = EINVAL;
return NULL;
}
g_cb = cb;
if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
FLAGS_rdma_memory_pool_max_regions > RDMA_MEMORY_POOL_MAX_REGIONS) {
LOG(WARNING) << "rdma_memory_pool_max_regions("
<< FLAGS_rdma_memory_pool_max_regions << ") not in ["
<< RDMA_MEMORY_POOL_MIN_REGIONS << ","
<< RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
errno = EINVAL;
return NULL;
}
if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
LOG(WARNING) << "rdma_memory_pool_initial_size_mb("
<< FLAGS_rdma_memory_pool_initial_size_mb << ") not in ["
<< RDMA_MEMORY_POOL_MIN_SIZE << ","
<< RDMA_MEMORY_POOL_MAX_SIZE << "]!";
errno = EINVAL;
return NULL;
}
if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
LOG(WARNING) << "rdma_memory_pool_increase_size_mb("
<< FLAGS_rdma_memory_pool_increase_size_mb << ") not in ["
<< RDMA_MEMORY_POOL_MIN_SIZE << ","
<< RDMA_MEMORY_POOL_MAX_SIZE << "]!";
errno = EINVAL;
return NULL;
}
if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
LOG(WARNING) << "rdma_memory_pool_buckets("
<< FLAGS_rdma_memory_pool_buckets << ") not in ["
<< RDMA_MEMORY_POOL_MIN_BUCKETS << ","
<< RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
errno = EINVAL;
return NULL;
}
g_buckets = FLAGS_rdma_memory_pool_buckets;
g_info = new (std::nothrow) GlobalInfo;
if (!g_info) {
return NULL;
}
for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
g_info->idle_list[i].resize(g_buckets, NULL);
if (g_info->idle_list[i].size() != g_buckets) {
return NULL;
}
g_info->lock[i].resize(g_buckets, NULL);
if (g_info->lock[i].size() != g_buckets) {
return NULL;
}
g_info->idle_size[i].resize(g_buckets, 0);
if (g_info->idle_size[i].size() != g_buckets) {
return NULL;
}
for (size_t j = 0; j < g_buckets; ++j) {
g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
if (!g_info->lock[i][j]) {
return NULL;
}
}
}
g_dump_mutex = new butil::Mutex;
g_tls_info_mutex = new butil::Mutex;
return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
BLOCK_DEFAULT);
}
static void* AllocBlockFrom(int block_type) {
bool locked = false;
if (BAIDU_UNLIKELY(g_dump_enable)) {
g_dump_mutex->lock();
locked = true;
}
void* ptr = NULL;
if (block_type == 0 && tls_idle_list != NULL){
CHECK(tls_idle_num > 0);
IdleNode* n = tls_idle_list;
tls_idle_list = n->next;
ptr = n->start;
butil::return_object<IdleNode>(n);
tls_idle_num--;
if (locked) {
g_dump_mutex->unlock();
}
return ptr;
}
uint64_t index = butil::fast_rand() % g_buckets;
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
IdleNode* node = g_info->idle_list[block_type][index];
if (!node) {
BAIDU_SCOPED_LOCK(g_info->extend_lock);
node = g_info->idle_list[block_type][index];
if (!node) {
// There is no block left, extend a new region
if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
block_type)) {
LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. "
<< "You can set the size of memory pool larger. "
<< "Refer to the help message of these flags: "
<< "rdma_memory_pool_initial_size_mb, "
<< "rdma_memory_pool_increase_size_mb, "
<< "rdma_memory_pool_max_regions.";
if (locked) {
g_dump_mutex->unlock();
}
return NULL;
}
node = g_info->idle_list[block_type][index];
}
}
if (node) {
ptr = node->start;
if (node->len > g_block_size[block_type]) {
node->start = (char*)node->start + g_block_size[block_type];
node->len -= g_block_size[block_type];
} else {
g_info->idle_list[block_type][index] = node->next;
butil::return_object<IdleNode>(node);
}
g_info->idle_size[block_type][index] -= g_block_size[block_type];
} else {
if (locked) {
g_dump_mutex->unlock();
}
return NULL;
}
// Move more blocks from global list to tls list
if (block_type == 0) {
node = g_info->idle_list[0][index];
tls_idle_list = node;
IdleNode* last_node = NULL;
while (node) {
if (tls_idle_num > (uint32_t)FLAGS_rdma_memory_pool_tls_cache_num / 2
|| node->len > g_block_size[0]) {
break;
}
tls_idle_num++;
last_node = node;
node = node->next;
}
if (tls_idle_num == 0) {
tls_idle_list = NULL;
} else {
g_info->idle_list[0][index] = node;
}
if (last_node) {
last_node->next = NULL;
}
}
if (locked) {
g_dump_mutex->unlock();
}
return ptr;
}
void* AllocBlock(size_t size) {
if (size == 0 || size > g_block_size[BLOCK_SIZE_COUNT - 1]) {
errno = EINVAL;
return NULL;
}
for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
if (size <= g_block_size[i]) {
return AllocBlockFrom(i);;
}
}
return NULL;
}
void RecycleAll() {
// Only block_type == 0 needs recycle
while (tls_idle_list) {
IdleNode* node = tls_idle_list;
tls_idle_list = node->next;
Region* r = GetRegion(node->start);
uint64_t index = ((uintptr_t)node->start - r->start) * g_buckets / r->size;
BAIDU_SCOPED_LOCK(*g_info->lock[0][index]);
node->next = g_info->idle_list[0][index];
g_info->idle_list[0][index] = node;
}
tls_idle_num = 0;
}
int DeallocBlock(void* buf) {
if (!buf) {
errno = EINVAL;
return -1;
}
Region* r = GetRegion(buf);
if (!r) {
errno = ERANGE;
return -1;
}
IdleNode* node = butil::get_object<IdleNode>();
if (!node) {
PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
// May lead to block leak, but do not return -1
return 0;
}
uint32_t block_type = r->block_type;
size_t block_size = g_block_size[block_type];
node->start = buf;
node->len = block_size;
bool locked = false;
if (BAIDU_UNLIKELY(g_dump_enable)) {
g_dump_mutex->lock();
locked = true;
}
if (block_type == 0 && tls_idle_num < (uint32_t)FLAGS_rdma_memory_pool_tls_cache_num) {
if (!tls_inited) {
tls_inited = true;
butil::thread_atexit(RecycleAll);
BAIDU_SCOPED_LOCK(*g_tls_info_mutex);
if (g_tls_info_cnt < 1024) {
g_tls_info[g_tls_info_cnt++] = &tls_idle_num;
}
}
tls_idle_num++;
node->next = tls_idle_list;
tls_idle_list = node;
if (locked) {
g_dump_mutex->unlock();
}
return 0;
}
uint64_t index = ((uintptr_t)buf - r->start) * g_buckets / r->size;
if (block_type == 0) {
size_t len = 0;
// Recycle half the cached blocks in tls for default block size
int num = FLAGS_rdma_memory_pool_tls_cache_num / 2;
IdleNode* new_head = tls_idle_list;
IdleNode* recycle_tail = NULL;
for (int i = 0; i < num; ++i) {
recycle_tail = new_head;
len += recycle_tail->len;
new_head = new_head->next;
}
if (recycle_tail) {
BAIDU_SCOPED_LOCK(*g_info->lock[0][index]);
recycle_tail->next = node;
node->next = g_info->idle_list[0][index];
g_info->idle_list[0][index] = tls_idle_list;
g_info->idle_size[0][index] += len;
}
tls_idle_list = new_head;
tls_idle_num -= num;
} else {
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
node->next = g_info->idle_list[block_type][index];
g_info->idle_list[block_type][index] = node;
g_info->idle_size[block_type][index] += node->len;
}
if (locked) {
g_dump_mutex->unlock();
}
return 0;
}
size_t GetBlockSize(int type) {
return g_block_size[type];
}
void DumpMemoryPoolInfo(std::ostream& os) {
if (!g_dump_mutex) {
return;
}
g_dump_enable = true;
usleep(1000); // wait until all the threads read new g_dump_enable
BAIDU_SCOPED_LOCK(*g_dump_mutex);
os << "********************* Memory Pool Info Dump **********************\n";
os << "Region Info:\n";
for (int i = 0; i < g_region_num; ++i) {
os << "\tRegion " << i << ":\n"
<< "\t\tBase Addr: " << g_regions[i].start << "\n"
<< "\t\tSize: " << g_regions[i].size << "\n"
<< "\t\tBlock Type: " << g_regions[i].block_type << "\n"
<< "\t\tId: " << g_regions[i].id << "\n";
}
os << "Idle List Info:\n";
for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
os << "\tFor block size " << GetBlockSize(i) << ":\n";
for (size_t j = 0; j < g_buckets; ++j) {
os << "\t\tBucket " << j << ": " << g_info->idle_size[i][j] << "\n";
}
}
os << "Thread Local Cache Info:\n";
for (size_t i = 0; i < g_tls_info_cnt; ++i) {
os << "\tThread " << i << ": " << *g_tls_info[i] * 8192 << "\n";
}
os << "******************************************************************\n";
g_dump_enable = false;
}
// Just for UT
void DestroyBlockPool() {
RecycleAll();
for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
for (size_t j = 0; j < g_buckets; ++j) {
IdleNode* node = g_info->idle_list[i][j];
while (node) {
IdleNode* tmp = node->next;
butil::return_object<IdleNode>(node);
node = tmp;
}
g_info->idle_list[i][j] = NULL;
}
}
delete g_info;
g_info = NULL;
for (int i = 0; i < g_region_num; ++i) {
if (g_regions[i].start == 0) {
break;
}
free((void*)g_regions[i].start);
g_regions[i].start = 0;
}
g_region_num = 0;
g_cb = NULL;
}
// Just for UT
int GetBlockType(void* buf) {
Region* r = GetRegion(buf);
if (!r) {
return -1;
}
return r->block_type;
}
// Just for UT
size_t GetGlobalLen(int block_type) {
size_t len = 0;
for (size_t i = 0; i < g_buckets; ++i) {
IdleNode* node = g_info->idle_list[block_type][i];
while (node) {
len += node->len;
node = node->next;
}
}
return len;
}
// Just for UT
size_t GetRegionNum() {
return g_region_num;
}
} // namespace rdma
} // namespace brpc
#endif // if BRPC_WITH_RDMA