Skip to content

Commit c8eb39c

Browse files
committed
refactor: abstract multi-tier block management from BlockManagerPool.
1 parent 7c951c7 commit c8eb39c

12 files changed

+398
-384
lines changed

xllm/core/framework/block/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ cc_library(
1111
block_manager_pool.h
1212
block_manager_impl.h
1313
concurrent_block_manager_impl.h
14+
multi_tier_block_manager_pool.h
1415
SRCS
1516
block.cpp
1617
block_manager_pool.cpp
1718
concurrent_block_manager_impl.cpp
1819
block_manager_impl.cpp
20+
multi_tier_block_manager_pool.cpp
1921
DEPS
2022
$<$<BOOL:${USE_NPU}>:torch_npu>
2123
$<$<BOOL:${USE_NPU}>:graph>

xllm/core/framework/block/block_manager_pool.cpp

Lines changed: 0 additions & 197 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ BlockManagerPool::BlockManagerPool(const Options& options, int32_t dp_size)
2424
: options_(options) {
2525
CHECK(dp_size > 0) << "dp_size must be greater than 0";
2626
block_managers_.reserve(dp_size);
27-
host_block_managers_.reserve(dp_size);
2827

2928
BlockManager::Options npu_options;
3029
npu_options.num_blocks(options_.num_blocks())
@@ -33,31 +32,16 @@ BlockManagerPool::BlockManagerPool(const Options& options, int32_t dp_size)
3332
.enable_disagg_pd(options_.enable_disagg_pd())
3433
.enable_cache_upload(options_.enable_cache_upload());
3534

36-
BlockManager::Options host_options = npu_options;
37-
host_options.num_blocks(options_.host_num_blocks())
38-
.enable_cache_upload(false);
39-
4035
for (int32_t i = 0; i < dp_size; ++i) {
4136
if (options.enable_disagg_pd() || options_.enable_kvcache_store()) {
4237
block_managers_.emplace_back(
4338
std::make_unique<ConcurrentBlockManagerImpl>(npu_options));
44-
if (options_.host_num_blocks() > 0) {
45-
host_block_managers_.emplace_back(
46-
std::make_unique<ConcurrentBlockManagerImpl>(host_options));
47-
}
4839
} else {
4940
block_managers_.emplace_back(
5041
std::make_unique<BlockManagerImpl>(npu_options));
51-
if (options_.host_num_blocks() > 0) {
52-
host_block_managers_.emplace_back(
53-
std::make_unique<BlockManagerImpl>(host_options));
54-
}
5542
}
5643
}
5744
reset_transfer_infos();
58-
offload_block_transfer_infos_.resize(block_managers_.size());
59-
released_host_blocks_.resize(block_managers_.size());
60-
released_device_blocks_.resize(block_managers_.size());
6145
}
6246

6347
int32_t BlockManagerPool::get_manager_with_max_free_blocks() const {
@@ -89,16 +73,6 @@ int32_t BlockManagerPool::get_dp_rank(Sequence* sequence) const {
8973
return dp_rank;
9074
}
9175

92-
BlockManager* BlockManagerPool::get_block_manager(Sequence* sequence,
93-
bool is_host) {
94-
int32_t dp_rank = get_dp_rank(sequence);
95-
if (is_host) {
96-
return host_block_managers_[dp_rank].get();
97-
} else {
98-
return block_managers_[dp_rank].get();
99-
}
100-
}
101-
10276
void BlockManagerPool::deallocate(Request* request) {
10377
DCHECK(request != nullptr);
10478
for (auto& sequence : request->sequences()) {
@@ -117,9 +91,6 @@ void BlockManagerPool::deallocate(Sequence* sequence) {
11791
// add blocks to the prefix cache
11892
int32_t dp_rank = get_dp_rank(sequence);
11993
cache(sequence);
120-
if (!host_block_managers_.empty()) {
121-
save_offload_blocks(sequence);
122-
}
12394
block_managers_[dp_rank]->deallocate(sequence->kv_state().kv_blocks());
12495
// release the blocks after prefix cache insertion
12596
sequence->reset();
@@ -130,57 +101,9 @@ BlockManagerPool::get_swap_block_transfer_infos() {
130101
return &swap_block_transfer_infos_;
131102
}
132103

133-
std::vector<std::vector<BlockTransferInfo>>*
134-
BlockManagerPool::get_offload_block_transfer_infos() {
135-
return &offload_block_transfer_infos_;
136-
}
137-
138-
std::vector<std::vector<BlockTransferInfo>>*
139-
BlockManagerPool::get_load_block_transfer_infos() {
140-
return &load_block_transfer_infos_;
141-
}
142-
143-
void BlockManagerPool::postprocess_offload(
144-
std::vector<std::vector<folly::SemiFuture<uint32_t>>>& futures) {
145-
DCHECK(futures.size() == block_managers_.size());
146-
for (int i = 0; i < futures.size(); i++) {
147-
if (futures[i].empty()) {
148-
continue;
149-
}
150-
// TODO(kangmeng): add timeout
151-
folly::collectAll(std::move(futures[i]))
152-
.via(folly::getGlobalCPUExecutor())
153-
.thenValue([host_blocks = std::move(released_host_blocks_[i]),
154-
device_blocks = std::move(released_device_blocks_[i]),
155-
host_block_mgr_ptr = host_block_managers_[i].get(),
156-
device_block_mgr_ptr = block_managers_[i].get()](
157-
std::vector<folly::Try<uint32_t>>&& results) {
158-
for (auto&& result : results) {
159-
if (result.value() != host_blocks.size()) {
160-
LOG(FATAL) << "Offload copy fail, expected " << host_blocks.size()
161-
<< ", got " << result.value();
162-
}
163-
}
164-
host_block_mgr_ptr->cache(host_blocks);
165-
host_block_mgr_ptr->deallocate({host_blocks});
166-
device_block_mgr_ptr->deallocate({device_blocks});
167-
return 0;
168-
});
169-
}
170-
171-
offload_block_transfer_infos_.clear();
172-
released_host_blocks_.clear();
173-
released_device_blocks_.clear();
174-
offload_block_transfer_infos_.resize(block_managers_.size());
175-
released_host_blocks_.resize(block_managers_.size());
176-
released_device_blocks_.resize(block_managers_.size());
177-
}
178-
179104
void BlockManagerPool::reset_transfer_infos() {
180105
swap_block_transfer_infos_.clear();
181106
swap_block_transfer_infos_.resize(block_managers_.size());
182-
load_block_transfer_infos_.clear();
183-
load_block_transfer_infos_.resize(block_managers_.size());
184107
}
185108

186109
bool BlockManagerPool::allocate(Sequence* sequence) {
@@ -206,9 +129,6 @@ bool BlockManagerPool::allocate(Sequence* sequence, size_t num_tokens) {
206129
// first try to allocate shared blocks
207130
if (sequence->kv_state().num_kv_blocks() == 0) {
208131
allocate_shared(sequence);
209-
if (sequence->host_kv_state().num_kv_blocks() == 0) {
210-
allocate_host_shared(sequence);
211-
}
212132
}
213133

214134
const size_t num_blocks = sequence->kv_state().num_kv_blocks();
@@ -232,25 +152,6 @@ bool BlockManagerPool::allocate(Sequence* sequence, size_t num_tokens) {
232152

233153
sequence->add_kv_blocks(blocks);
234154

235-
size_t hbm_cache_token_num = sequence->kv_state().kv_cache_tokens_num();
236-
size_t host_cache_token_num = sequence->host_kv_state().kv_cache_tokens_num();
237-
if (hbm_cache_token_num < host_cache_token_num) {
238-
auto hbm_blocks = sequence->kv_state().kv_blocks();
239-
auto host_blocks = sequence->host_kv_state().kv_blocks();
240-
241-
for (int i = hbm_cache_token_num / options_.block_size();
242-
i < host_cache_token_num / options_.block_size();
243-
i++) {
244-
load_block_transfer_infos_[dp_rank].emplace_back(
245-
BlockTransferInfo(host_blocks[i].id(),
246-
hbm_blocks[i].id(),
247-
host_blocks[i].get_immutable_hash_value(),
248-
TransferType::H2D));
249-
}
250-
sequence->kv_state().incr_kv_cache_tokens_num(host_cache_token_num -
251-
hbm_cache_token_num);
252-
}
253-
254155
return true;
255156
}
256157

@@ -289,39 +190,6 @@ bool BlockManagerPool::process_beam_search(Sequence* sequence, bool need_swap) {
289190
return true;
290191
}
291192

292-
uint32_t BlockManagerPool::pre_allocate(Sequence* sequence) {
293-
DCHECK(sequence != nullptr);
294-
295-
if (!options_.enable_kvcache_store() ||
296-
sequence->kv_state().num_kv_blocks() != 0 ||
297-
sequence->host_kv_state().num_kv_blocks() != 0) {
298-
return 0;
299-
}
300-
301-
int32_t dp_rank = get_dp_rank(sequence);
302-
allocate_host_shared(sequence);
303-
304-
const size_t num_blocks = sequence->host_kv_state().num_kv_blocks();
305-
// round down to the nearest block number
306-
const size_t block_size = options_.block_size();
307-
const size_t num_additional_blocks =
308-
sequence->num_tokens() / block_size - num_blocks;
309-
if (num_additional_blocks <= 0) {
310-
return 0;
311-
}
312-
313-
auto host_blocks =
314-
host_block_managers_[dp_rank]->allocate(num_additional_blocks);
315-
if (host_blocks.size() != num_additional_blocks) {
316-
return 0;
317-
}
318-
sequence->host_kv_state().add_kv_blocks(host_blocks);
319-
PrefixCache::compute_hash_keys(
320-
sequence->tokens(), *sequence->host_kv_state().mutable_kv_blocks());
321-
322-
return num_additional_blocks;
323-
}
324-
325193
void BlockManagerPool::allocate_shared(Sequence* sequence) {
326194
// only allocate shared blocks for prefill sequences
327195
if (options_.enable_prefix_cache()) {
@@ -344,71 +212,6 @@ void BlockManagerPool::cache(Sequence* sequence) {
344212
block_managers_[dp_rank]->cache(token_ids, *blocks);
345213
}
346214

347-
void BlockManagerPool::allocate_host_shared(Sequence* sequence) {
348-
// only allocate shared blocks for prefill sequences
349-
if (sequence->host_kv_state().num_kv_blocks() != 0 ||
350-
host_block_managers_.size() != block_managers_.size()) {
351-
return;
352-
}
353-
354-
if (options_.enable_prefix_cache()) {
355-
int32_t dp_rank = get_dp_rank(sequence);
356-
std::vector<Block> shared_blocks =
357-
host_block_managers_[dp_rank]->allocate_shared(sequence->tokens());
358-
sequence->add_shared_host_kv_blocks(std::move(shared_blocks));
359-
}
360-
}
361-
362-
void BlockManagerPool::save_offload_blocks(Sequence* sequence) {
363-
DCHECK(sequence != nullptr);
364-
365-
auto* blocks = sequence->kv_state().mutable_kv_blocks();
366-
auto* host_blocks = sequence->host_kv_state().mutable_kv_blocks();
367-
368-
if (blocks->size() == 0 || host_blocks->size() >= blocks->size()) {
369-
return;
370-
}
371-
372-
int cached_block_num =
373-
sequence->host_kv_state().kv_cache_tokens_num() / options_.block_size();
374-
375-
int32_t dp_rank = get_dp_rank(sequence);
376-
377-
if (host_blocks->size() > 0) {
378-
host_block_managers_[dp_rank]->cache(sequence->tokens(), *host_blocks);
379-
}
380-
381-
size_t needed_block_num =
382-
sequence->num_tokens() / options_.block_size() - host_blocks->size();
383-
384-
if (needed_block_num == 0) {
385-
return;
386-
}
387-
388-
sequence->host_kv_state().add_kv_blocks(
389-
host_block_managers_[dp_rank]->allocate(needed_block_num));
390-
391-
for (int i = cached_block_num; i < host_blocks->size(); i++) {
392-
if (blocks->at(i).ref_count() != 2) {
393-
continue;
394-
}
395-
396-
host_blocks->at(i).set_hash_value(blocks->at(i).get_immutable_hash_value());
397-
released_host_blocks_[dp_rank].emplace_back(std::move(host_blocks->at(i)));
398-
released_device_blocks_[dp_rank].emplace_back(std::move(blocks->at(i)));
399-
offload_block_transfer_infos_[dp_rank].emplace_back(BlockTransferInfo(
400-
released_device_blocks_[dp_rank].back().id(),
401-
released_host_blocks_[dp_rank].back().id(),
402-
released_host_blocks_[dp_rank].back().get_immutable_hash_value(),
403-
released_host_blocks_[dp_rank].back().get_hash_value_len(),
404-
TransferType::D2G));
405-
}
406-
host_block_managers_[dp_rank]->cache(
407-
*sequence->host_kv_state().mutable_kv_blocks());
408-
host_block_managers_[dp_rank]->deallocate(
409-
sequence->host_kv_state().kv_blocks());
410-
}
411-
412215
void BlockManagerPool::get_merged_kvcache_event(KvCacheEvent* event) const {
413216
for (int32_t i = 0; i < block_managers_.size(); ++i) {
414217
block_managers_[i]->get_merged_kvcache_event(event);

0 commit comments

Comments
 (0)