Skip to content

Commit 5eb557b

Browse files
committed
Support dynamic expansion of RDMA block pool
1 parent ede6b77 commit 5eb557b

File tree

2 files changed

+75
-83
lines changed

2 files changed

+75
-83
lines changed

src/brpc/rdma/block_pool.cpp

Lines changed: 73 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,9 @@
2525
#include "butil/iobuf.h"
2626
#include "butil/object_pool.h"
2727
#include "butil/thread_local.h"
28-
#include "bthread/bthread.h"
28+
#include "butil/memory/scope_guard.h"
2929
#include "brpc/rdma/block_pool.h"
3030

31-
3231
namespace brpc {
3332
namespace rdma {
3433

@@ -40,7 +39,7 @@ DEFINE_int32(rdma_memory_pool_max_regions, 3, "Max number of regions");
4039
DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
4140
DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
4241
DEFINE_bool(rdma_memory_pool_user_specified_memory, false,
43-
"If true, the user must call UserExtendBlockPool() to extend "
42+
"[DEPRECATED]If true, the user must call UserExtendBlockPool() to extend "
4443
"memory. bRPC will not handle memory extension.");
4544

4645
static RegisterCallback g_cb = NULL;
@@ -98,6 +97,8 @@ struct GlobalInfo {
9897
std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
9998
int region_num[BLOCK_SIZE_COUNT];
10099
butil::Mutex extend_lock;
100+
std::vector<IdleNode*> expansion_list[BLOCK_SIZE_COUNT];
101+
std::vector<size_t> expansion_size[BLOCK_SIZE_COUNT];
101102
};
102103
static GlobalInfo* g_info = NULL;
103104

@@ -129,36 +130,20 @@ uint32_t GetRegionId(const void* buf) {
129130
return r->id;
130131
}
131132

132-
// When both rdma_memory_pool_max_regions and rdma_memory_pool_buckets are
133-
// greater than 1, dynamic memory expansion may cause concurrent modification
134-
// issues in the memory linked list due to lock contention problems. To address
135-
// this, we increase the region_num count for each block_type. Dynamic memory
136-
// expansion is only permitted when both of the following conditions are met:
137-
// rdma_memory_pool_buckets equals 1
138-
// g_info->region_num[block_type] is less than 1
139-
static bool CanExtendBlockRuntime(int block_type) {
140-
return FLAGS_rdma_memory_pool_buckets == 1 ||
141-
g_info->region_num[block_type] < 1;
142-
}
143-
144-
static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
145-
int block_type) {
146-
if (CanExtendBlockRuntime(block_type) == false) {
147-
LOG(INFO) << "Runtime extend memory only support one bucket or region "
148-
"num is zero for per block_type";
133+
static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, int block_type) {
134+
auto region_base_guard = butil::MakeScopeGuard([region_base]() {
149135
free(region_base);
150-
errno = ENOMEM;
151-
return NULL;
152-
}
136+
});
137+
153138
if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
154-
LOG(INFO) << "Memory pool reaches max regions";
155-
free(region_base);
139+
PLOG_EVERY_SECOND(ERROR) << "Memory pool reaches max regions";
156140
errno = ENOMEM;
157141
return NULL;
158142
}
143+
159144
uint32_t id = g_cb(region_base, region_size);
160145
if (id == 0) {
161-
free(region_base);
146+
errno = EINVAL;
162147
return NULL;
163148
}
164149

@@ -170,7 +155,7 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
170155
for (size_t j = 0; j < i; ++j) {
171156
butil::return_object<IdleNode>(node[j]);
172157
}
173-
free(region_base);
158+
errno = ENOMEM;
174159
return NULL;
175160
}
176161
}
@@ -184,12 +169,15 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
184169
for (size_t i = 0; i < g_buckets; ++i) {
185170
node[i]->start = (void*)(region->start + i * (region_size / g_buckets));
186171
node[i]->len = region_size / g_buckets;
187-
node[i]->next = g_info->idle_list[block_type][i];
188-
g_info->idle_list[block_type][i] = node[i];
189-
g_info->idle_size[block_type][i] += node[i]->len;
172+
node[i]->next = g_info->expansion_list[block_type][i];
173+
g_info->expansion_list[block_type][i] = node[i];
174+
g_info->expansion_size[block_type][i] += node[i]->len;
190175
}
191176
g_info->region_num[block_type]++;
192177

178+
// `region_base' is inuse, cannot be freed.
179+
region_base_guard.dismiss();
180+
193181
return region_base;
194182
}
195183

@@ -200,13 +188,6 @@ static void* ExtendBlockPool(size_t region_size, int block_type) {
200188
return NULL;
201189
}
202190

203-
if (FLAGS_rdma_memory_pool_user_specified_memory) {
204-
LOG_EVERY_SECOND(ERROR) << "Fail to extend new region, "
205-
"rdma_memory_pool_user_specified_memory is "
206-
"true, ExtendBlockPool is disabled";
207-
return NULL;
208-
}
209-
210191
// Regularize region size
211192
region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
212193
region_size *= g_block_size[block_type] * g_buckets;
@@ -222,24 +203,19 @@ static void* ExtendBlockPool(size_t region_size, int block_type) {
222203
return ExtendBlockPoolImpl(region_base, region_size, block_type);
223204
}
224205

225-
void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
226-
int block_type) {
227-
if (FLAGS_rdma_memory_pool_user_specified_memory == false) {
228-
LOG_EVERY_SECOND(ERROR) << "User extend memory is disabled";
229-
return NULL;
230-
}
206+
void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int block_type) {
231207
if (reinterpret_cast<uintptr_t>(region_base) % 4096 != 0) {
232208
LOG_EVERY_SECOND(ERROR) << "region_base must be 4096 aligned";
209+
free(region_base);
210+
errno = EINVAL;
233211
return NULL;
234212
}
235213

236-
uint64_t index = butil::fast_rand() % g_buckets;
237-
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
238-
BAIDU_SCOPED_LOCK(g_info->extend_lock);
239214
region_size =
240215
region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
241216
region_size *= g_block_size[block_type] * g_buckets;
242217

218+
BAIDU_SCOPED_LOCK(g_info->extend_lock);
243219
return ExtendBlockPoolImpl(region_base, region_size, block_type);
244220
}
245221

@@ -316,6 +292,14 @@ bool InitBlockPool(RegisterCallback cb) {
316292
return false;
317293
}
318294
}
295+
g_info->expansion_list[i].resize(g_buckets, NULL);
296+
if (g_info->expansion_list[i].size() != g_buckets) {
297+
return false;
298+
}
299+
g_info->expansion_size[i].resize(g_buckets, 0);
300+
if (g_info->expansion_size[i].size() != g_buckets) {
301+
return false;
302+
}
319303
}
320304

321305
g_dump_mutex = new butil::Mutex;
@@ -332,66 +316,74 @@ bool InitBlockPool(RegisterCallback cb) {
332316
return false;
333317
}
334318

319+
static void MoveExpansionList2EmptyIdleList(int block_type, size_t index) {
320+
CHECK(NULL == g_info->idle_list[block_type][index]);
321+
322+
g_info->idle_list[block_type][index] = g_info->expansion_list[block_type][index];
323+
g_info->idle_size[block_type][index] += g_info->expansion_size[block_type][index];
324+
g_info->expansion_list[block_type][index] = NULL;
325+
g_info->expansion_size[block_type][index] = 0;
326+
}
327+
335328
static void* AllocBlockFrom(int block_type) {
336329
bool locked = false;
337330
if (BAIDU_UNLIKELY(g_dump_enable)) {
338331
g_dump_mutex->lock();
339332
locked = true;
340333
}
334+
BUTIL_SCOPE_EXIT {
335+
if (locked) {
336+
g_dump_mutex->unlock();
337+
}
338+
};
339+
341340
void* ptr = NULL;
342-
if (block_type == 0 && tls_idle_list != NULL){
341+
if (0 == block_type && NULL != tls_idle_list) {
343342
CHECK(tls_idle_num > 0);
344343
IdleNode* n = tls_idle_list;
345344
tls_idle_list = n->next;
346345
ptr = n->start;
347346
butil::return_object<IdleNode>(n);
348347
tls_idle_num--;
349-
if (locked) {
350-
g_dump_mutex->unlock();
351-
}
352348
return ptr;
353349
}
354350

355-
uint64_t index = butil::fast_rand() % g_buckets;
351+
size_t index = butil::fast_rand() % g_buckets;
356352
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
357353
IdleNode* node = g_info->idle_list[block_type][index];
358-
if (!node) {
354+
if (NULL == node) {
359355
BAIDU_SCOPED_LOCK(g_info->extend_lock);
360356
node = g_info->idle_list[block_type][index];
361-
if (!node) {
362-
// There is no block left, extend a new region
363-
if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
364-
block_type)) {
357+
if (NULL == node && NULL != g_info->expansion_list[block_type][index]) {
358+
MoveExpansionList2EmptyIdleList(block_type, index);
359+
node = g_info->idle_list[block_type][index];
360+
}
361+
if (NULL == node) {
362+
// There is no block left, extend a new region.
363+
if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb, block_type)) {
365364
LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. "
366365
<< "You can set the size of memory pool larger. "
367366
<< "Refer to the help message of these flags: "
368367
<< "rdma_memory_pool_initial_size_mb, "
369368
<< "rdma_memory_pool_increase_size_mb, "
370369
<< "rdma_memory_pool_max_regions.";
371-
if (locked) {
372-
g_dump_mutex->unlock();
373-
}
374370
return NULL;
375371
}
372+
MoveExpansionList2EmptyIdleList(block_type, index);
376373
node = g_info->idle_list[block_type][index];
377374
}
378375
}
379-
if (node) {
380-
ptr = node->start;
381-
if (node->len > g_block_size[block_type]) {
382-
node->start = (char*)node->start + g_block_size[block_type];
383-
node->len -= g_block_size[block_type];
384-
} else {
385-
g_info->idle_list[block_type][index] = node->next;
386-
butil::return_object<IdleNode>(node);
387-
}
388-
g_info->idle_size[block_type][index] -= g_block_size[block_type];
376+
CHECK(NULL != node);
377+
378+
ptr = node->start;
379+
if (node->len > g_block_size[block_type]) {
380+
node->start = (char*)node->start + g_block_size[block_type];
381+
node->len -= g_block_size[block_type];
389382
} else {
390-
if (locked) {
391-
g_dump_mutex->unlock();
392-
}
393-
return NULL;
383+
g_info->idle_list[block_type][index] = node->next;
384+
butil::return_object<IdleNode>(node);
394385
}
386+
g_info->idle_size[block_type][index] -= g_block_size[block_type];
395387

396388
// Move more blocks from global list to tls list
397389
if (block_type == 0) {
@@ -417,9 +409,6 @@ static void* AllocBlockFrom(int block_type) {
417409
}
418410
}
419411

420-
if (locked) {
421-
g_dump_mutex->unlock();
422-
}
423412
return ptr;
424413
}
425414

@@ -482,6 +471,12 @@ int DeallocBlock(void* buf) {
482471
g_dump_mutex->lock();
483472
locked = true;
484473
}
474+
BUTIL_SCOPE_EXIT {
475+
if (locked) {
476+
g_dump_mutex->unlock();
477+
}
478+
};
479+
485480
if (block_type == 0 && tls_idle_num < (uint32_t)FLAGS_rdma_memory_pool_tls_cache_num) {
486481
if (!tls_inited) {
487482
tls_inited = true;
@@ -494,9 +489,6 @@ int DeallocBlock(void* buf) {
494489
tls_idle_num++;
495490
node->next = tls_idle_list;
496491
tls_idle_list = node;
497-
if (locked) {
498-
g_dump_mutex->unlock();
499-
}
500492
return 0;
501493
}
502494

@@ -527,9 +519,6 @@ int DeallocBlock(void* buf) {
527519
g_info->idle_list[block_type][index] = node;
528520
g_info->idle_size[block_type][index] += node->len;
529521
}
530-
if (locked) {
531-
g_dump_mutex->unlock();
532-
}
533522
return 0;
534523
}
535524

@@ -557,7 +546,8 @@ void DumpMemoryPoolInfo(std::ostream& os) {
557546
for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
558547
os << "\tFor block size " << GetBlockSize(i) << ":\n";
559548
for (size_t j = 0; j < g_buckets; ++j) {
560-
os << "\t\tBucket " << j << ": " << g_info->idle_size[i][j] << "\n";
549+
os << "\t\tBucket " << j << ": {" << g_info->idle_size[i][j]
550+
<< ", " << g_info->expansion_list[i][j] << "}\n";
561551
}
562552
}
563553
os << "Thread Local Cache Info:\n";

src/butil/memory/scope_guard.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,6 @@ operator+(ScopeExitHelper, Callback&& callback) {
104104
auto BRPC_ANONYMOUS_VARIABLE(SCOPE_EXIT) = \
105105
::butil::internal::ScopeExitHelper() + [&]() noexcept
106106

107+
#define BUTIL_SCOPE_EXIT BRPC_SCOPE_EXIT
108+
107109
#endif // BUTIL_SCOPED_GUARD_H

0 commit comments

Comments
 (0)