diff --git a/xllm/core/common/global_flags.cpp b/xllm/core/common/global_flags.cpp index 645bccec3..0959d1a6c 100644 --- a/xllm/core/common/global_flags.cpp +++ b/xllm/core/common/global_flags.cpp @@ -331,6 +331,16 @@ DEFINE_bool(enable_online_preempt_offline, // --- kvcache store config --- +DEFINE_uint32(prefetch_timeout, + 0, + "Prefetch timeout for prefetch from kv cache store."); + +DEFINE_uint32(prefetch_bacth_size, + 2, + "Prefetch from kvcache store copy batch size."); + +DEFINE_uint32(layers_wise_copy_batchs, 4, "Layer wise H2D copy batchs."); + DEFINE_double(host_blocks_factor, 0.0, "Host block factor, e.g. host block num = host_blocks_factor * " diff --git a/xllm/core/common/global_flags.h b/xllm/core/common/global_flags.h index f2594fa8d..7bcd8043c 100644 --- a/xllm/core/common/global_flags.h +++ b/xllm/core/common/global_flags.h @@ -153,6 +153,12 @@ DECLARE_bool(use_zero_evict); DECLARE_int32(max_decode_token_per_sequence); +DECLARE_uint32(prefetch_timeout); + +DECLARE_uint32(prefetch_bacth_size); + +DECLARE_uint32(layers_wise_copy_batchs); + DECLARE_string(priority_strategy); DECLARE_bool(enable_online_preempt_offline); diff --git a/xllm/core/common/options.cpp b/xllm/core/common/options.cpp index f27af7d71..2fa281587 100644 --- a/xllm/core/common/options.cpp +++ b/xllm/core/common/options.cpp @@ -53,6 +53,9 @@ std::string Options::to_string() const { << ", enable_service_routing: " << enable_service_routing() << ", enable_cache_upload: " << enable_cache_upload() << ", enable_kvcache_store: " << enable_kvcache_store() + << ", prefetch_timeout: " << prefetch_timeout() + << ", prefetch_bacth_size: " << prefetch_bacth_size() + << ", layers_wise_copy_batchs: " << layers_wise_copy_batchs() << ", store_protocol: " << store_protocol() << ", store_master_server_address: " << store_master_server_address() << ", store_metadata_server: " << store_metadata_server() diff --git a/xllm/core/common/options.h b/xllm/core/common/options.h index 0f0d7def7..5bd97f6f0 100644 --- a/xllm/core/common/options.h +++ b/xllm/core/common/options.h @@ -193,6 +193,15 @@ class Options { // Index ID for internal server ID, which must be set different values // if the model supports multiple version or there are multiple models. PROPERTY(int64_t, server_idx) = 0; + + // Prefetch timeout for prefetch from kv cache store + PROPERTY(uint32_t, prefetch_timeout) = 0; + + // Prefetch from kvcache store copy batch size + PROPERTY(uint32_t, prefetch_bacth_size) = 2; + + // Layer wise H2D copy batchs + PROPERTY(uint32_t, layers_wise_copy_batchs) = 4; }; } // namespace xllm diff --git a/xllm/core/distributed_runtime/comm_channel.cpp b/xllm/core/distributed_runtime/comm_channel.cpp index 24cb11f4e..df48d80d5 100644 --- a/xllm/core/distributed_runtime/comm_channel.cpp +++ b/xllm/core/distributed_runtime/comm_channel.cpp @@ -372,14 +372,14 @@ void CommChannel::transfer_kv_blocks( class ClientStreamReceiver : public brpc::StreamInputHandler { private: - const std::atomic& termination_flag_; + std::shared_ptr> termination_flag_; std::shared_ptr> success_cnt_; std::promise close_promise_; std::atomic promise_set_{false}; public: - ClientStreamReceiver(const std::atomic& termination_flag, - std::shared_ptr>& success_cnt) + ClientStreamReceiver(std::shared_ptr> termination_flag, + std::shared_ptr> success_cnt) : termination_flag_(termination_flag), success_cnt_(success_cnt) {} ~ClientStreamReceiver() { @@ -398,9 +398,10 @@ class ClientStreamReceiver : public brpc::StreamInputHandler { int32_t success_cnt = std::stoi(msg_str); if (success_cnt > 0 && - !termination_flag_.load(std::memory_order_acquire)) { + !termination_flag_->load(std::memory_order_acquire)) { success_cnt_->fetch_add(success_cnt, std::memory_order_relaxed); } else { + termination_flag_->store(true, std::memory_order_release); brpc::StreamClose(id); if (!promise_set_.exchange(true)) { close_promise_.set_value(); @@ -425,9 +426,9 @@ class ClientStreamReceiver : public brpc::StreamInputHandler { }; void CommChannel::prefetch_from_storage( - const std::atomic& flag, const std::vector& block_transfer_info, - std::shared_ptr>& success_cnt) { + std::shared_ptr> flag, + std::shared_ptr> success_cnt) { proto::BlockTransferInfos pb_block_transfer_info; if (!block_transfer_info_to_proto(block_transfer_info, &pb_block_transfer_info)) { diff --git a/xllm/core/distributed_runtime/comm_channel.h b/xllm/core/distributed_runtime/comm_channel.h index c13458b97..a3867c2a2 100644 --- a/xllm/core/distributed_runtime/comm_channel.h +++ b/xllm/core/distributed_runtime/comm_channel.h @@ -98,9 +98,9 @@ class CommChannel { const std::vector& block_transfer_info); virtual void prefetch_from_storage( - const std::atomic& flag, const std::vector& block_transfer_info, - std::shared_ptr>& success_cnt); + std::shared_ptr> flag, + std::shared_ptr> success_cnt); virtual bool get_last_step_result_async( folly::Promise>& promise); diff --git a/xllm/core/distributed_runtime/remote_worker.cpp b/xllm/core/distributed_runtime/remote_worker.cpp index 4a6920179..7d646167e 100644 --- a/xllm/core/distributed_runtime/remote_worker.cpp +++ b/xllm/core/distributed_runtime/remote_worker.cpp @@ -313,15 +313,15 @@ void RemoteWorker::transfer_kv_blocks( } void RemoteWorker::prefetch_from_storage( - const std::atomic& flag, const std::vector& block_transfer_info, - std::shared_ptr>& success_cnt) { + std::shared_ptr> flag, + std::shared_ptr> success_cnt) { copy_threadpool_.schedule( [this, - flag = &flag, block_transfer_info = std::move(block_transfer_info), + flag = flag, success_cnt = success_cnt]() mutable { - channel_->prefetch_from_storage(flag, block_transfer_info, success_cnt); + channel_->prefetch_from_storage(block_transfer_info, flag, success_cnt); }); } diff --git a/xllm/core/distributed_runtime/remote_worker.h b/xllm/core/distributed_runtime/remote_worker.h index 99866ffa4..db3039344 100644 --- a/xllm/core/distributed_runtime/remote_worker.h +++ b/xllm/core/distributed_runtime/remote_worker.h @@ -120,9 +120,9 @@ class RemoteWorker : public WorkerClient { const std::vector& block_transfer_info) override; virtual void prefetch_from_storage( - const std::atomic& flag, const std::vector& block_transfer_info, - std::shared_ptr>& success_cnt) override; + std::shared_ptr> flag, + std::shared_ptr> success_cnt) override; // Run the model and return the output. virtual folly::SemiFuture> step_async( diff --git a/xllm/core/distributed_runtime/worker_service.cpp b/xllm/core/distributed_runtime/worker_service.cpp index 44e52bfe4..213e9a837 100644 --- a/xllm/core/distributed_runtime/worker_service.cpp +++ b/xllm/core/distributed_runtime/worker_service.cpp @@ -33,8 +33,6 @@ limitations under the License. namespace xllm { -constexpr uint32_t COPY_BATCH_SIZE = 1; - WorkerService::WorkerService(runtime::Options options, const torch::Device& device) : options_(options), device_(device), initialized_(false) { @@ -404,11 +402,8 @@ void WorkerService::TransferBlocks( std::vector block_transfer_info; uint64_t batch_id = proto_to_block_transfer_info(*req, block_transfer_info); - if (batch_id == UNINITIALIZED_BATCH_ID) { - resp->set_success_cnt(worker_->transfer_kv_blocks(block_transfer_info)); - } else { - worker_->transfer_kv_blocks(batch_id, std::move(block_transfer_info)); - } + resp->set_success_cnt( + worker_->transfer_kv_blocks(batch_id, std::move(block_transfer_info))); return; } @@ -477,22 +472,24 @@ void WorkerService::PrefetchFromStorage( auto close_future = stream_handler->get_close_future(); bool is_completed = false; - for (size_t i = 0; i < transfer_slice.size(); i += COPY_BATCH_SIZE) { - auto current_slice = transfer_slice.slice( - i, std::min(i + COPY_BATCH_SIZE, transfer_slice.size())); + for (size_t i = 0; i < transfer_slice.size(); + i += options_.prefetch_bacth_size()) { + auto current_slice = + transfer_slice.slice(i, + std::min(i + options_.prefetch_bacth_size(), + transfer_slice.size())); - auto success_cnt = worker_->prefetch_from_storage(current_slice); + auto success_cnt = worker_->transfer_kv_blocks(UNINITIALIZED_BATCH_ID, + current_slice); if (success_cnt != current_slice.size() || - i + COPY_BATCH_SIZE >= transfer_slice.size()) { + i + options_.prefetch_bacth_size() >= transfer_slice.size()) { is_completed = true; } butil::IOBuf buf; buf.append(std::to_string(success_cnt)); if (brpc::StreamWrite(*stream_id.get(), buf) != 0) { - brpc::StreamClose(*stream_id.get()); - is_completed = false; break; } @@ -505,9 +502,8 @@ void WorkerService::PrefetchFromStorage( break; } } - if (is_completed) { - close_future.wait(); - } + + close_future.wait(); brpc::StreamClose(*stream_id.get()); }); diff --git a/xllm/core/framework/block/CMakeLists.txt b/xllm/core/framework/block/CMakeLists.txt index 013d58495..b1b2aeb84 100644 --- a/xllm/core/framework/block/CMakeLists.txt +++ b/xllm/core/framework/block/CMakeLists.txt @@ -11,11 +11,13 @@ cc_library( block_manager_pool.h block_manager_impl.h concurrent_block_manager_impl.h + hierarchy_block_manager_pool.h SRCS block.cpp block_manager_pool.cpp concurrent_block_manager_impl.cpp block_manager_impl.cpp + hierarchy_block_manager_pool.cpp DEPS $<$:torch_npu> $<$:graph> diff --git a/xllm/core/framework/block/block_manager.h b/xllm/core/framework/block/block_manager.h index 0fef5e4c9..eac76c6e2 100644 --- a/xllm/core/framework/block/block_manager.h +++ b/xllm/core/framework/block/block_manager.h @@ -54,6 +54,8 @@ class BlockManager { virtual void deallocate(const Slice& blocks) = 0; + virtual void deallocate(std::vector& blocks) = 0; + virtual std::vector allocate(size_t num_blocks) = 0; virtual std::vector allocate_shared( diff --git a/xllm/core/framework/block/block_manager_impl.cpp b/xllm/core/framework/block/block_manager_impl.cpp index 2ba11a331..0aa81474e 100644 --- a/xllm/core/framework/block/block_manager_impl.cpp +++ b/xllm/core/framework/block/block_manager_impl.cpp @@ -93,6 +93,12 @@ void BlockManagerImpl::deallocate(const Slice& blocks) { } } +void BlockManagerImpl::deallocate(std::vector& blocks) { + Slice slice(blocks); + deallocate(slice); + blocks.clear(); +} + bool BlockManagerImpl::has_enough_blocks(uint32_t num_blocks) { if (num_blocks <= num_free_blocks_) { return true; @@ -171,7 +177,6 @@ void BlockManagerImpl::get_merged_kvcache_event(KvCacheEvent* event) const { if (events != nullptr) { event->removed_cache.merge(events->removed_cache); event->stored_cache.merge(events->stored_cache); - event->offload_cache.merge(events->offload_cache); events->clear(); } } diff --git a/xllm/core/framework/block/block_manager_impl.h b/xllm/core/framework/block/block_manager_impl.h index 6a6c96462..12e881d98 100644 --- a/xllm/core/framework/block/block_manager_impl.h +++ b/xllm/core/framework/block/block_manager_impl.h @@ -35,6 +35,8 @@ class BlockManagerImpl : public BlockManager { void deallocate(const Slice& blocks) override; + void deallocate(std::vector& blocks) override; + // allocate shared blocks when enable prefix cache std::vector allocate_shared( const Slice& tokens_ids, @@ -77,7 +79,7 @@ class BlockManagerImpl : public BlockManager { } float get_gpu_cache_usage_perc() const override { - return 1.0 - num_free_blocks_ * 1.0 / num_total_blocks(); + return 1 - static_cast(num_free_blocks_) / num_total_blocks(); } // call BlockManager to free block used by Block. diff --git a/xllm/core/framework/block/block_manager_pool.cpp b/xllm/core/framework/block/block_manager_pool.cpp index abb6eaf29..a175a3d1b 100644 --- a/xllm/core/framework/block/block_manager_pool.cpp +++ b/xllm/core/framework/block/block_manager_pool.cpp @@ -24,7 +24,6 @@ BlockManagerPool::BlockManagerPool(const Options& options, int32_t dp_size) : options_(options) { CHECK(dp_size > 0) << "dp_size must be greater than 0"; block_managers_.reserve(dp_size); - host_block_managers_.reserve(dp_size); BlockManager::Options npu_options; npu_options.num_blocks(options_.num_blocks()) @@ -33,31 +32,16 @@ BlockManagerPool::BlockManagerPool(const Options& options, int32_t dp_size) .enable_disagg_pd(options_.enable_disagg_pd()) .enable_cache_upload(options_.enable_cache_upload()); - BlockManager::Options host_options = npu_options; - host_options.num_blocks(options_.host_num_blocks()) - .enable_cache_upload(false); - for (int32_t i = 0; i < dp_size; ++i) { if (options.enable_disagg_pd() || options_.enable_kvcache_store()) { block_managers_.emplace_back( std::make_unique(npu_options)); - if (options_.host_num_blocks() > 0) { - host_block_managers_.emplace_back( - std::make_unique(host_options)); - } } else { block_managers_.emplace_back( std::make_unique(npu_options)); - if (options_.host_num_blocks() > 0) { - host_block_managers_.emplace_back( - std::make_unique(host_options)); - } } } reset_transfer_infos(); - offload_block_transfer_infos_.resize(block_managers_.size()); - released_host_blocks_.resize(block_managers_.size()); - released_device_blocks_.resize(block_managers_.size()); } int32_t BlockManagerPool::get_manager_with_max_free_blocks() const { @@ -89,16 +73,6 @@ int32_t BlockManagerPool::get_dp_rank(Sequence* sequence) const { return dp_rank; } -BlockManager* BlockManagerPool::get_block_manager(Sequence* sequence, - bool is_host) { - int32_t dp_rank = get_dp_rank(sequence); - if (is_host) { - return host_block_managers_[dp_rank].get(); - } else { - return block_managers_[dp_rank].get(); - } -} - void BlockManagerPool::deallocate(Request* request) { DCHECK(request != nullptr); for (auto& sequence : request->sequences()) { @@ -117,9 +91,6 @@ void BlockManagerPool::deallocate(Sequence* sequence) { // add blocks to the prefix cache int32_t dp_rank = get_dp_rank(sequence); cache(sequence); - if (!host_block_managers_.empty()) { - save_offload_blocks(sequence); - } block_managers_[dp_rank]->deallocate(sequence->kv_state().kv_blocks()); // release the blocks after prefix cache insertion sequence->reset(); @@ -130,57 +101,9 @@ BlockManagerPool::get_swap_block_transfer_infos() { return &swap_block_transfer_infos_; } -std::vector>* -BlockManagerPool::get_offload_block_transfer_infos() { - return &offload_block_transfer_infos_; -} - -std::vector>* -BlockManagerPool::get_load_block_transfer_infos() { - return &load_block_transfer_infos_; -} - -void BlockManagerPool::postprocess_offload( - std::vector>>& futures) { - DCHECK(futures.size() == block_managers_.size()); - for (int i = 0; i < futures.size(); i++) { - if (futures[i].empty()) { - continue; - } - // TODO(kangmeng): add timeout - folly::collectAll(std::move(futures[i])) - .via(folly::getGlobalCPUExecutor()) - .thenValue([host_blocks = std::move(released_host_blocks_[i]), - device_blocks = std::move(released_device_blocks_[i]), - host_block_mgr_ptr = host_block_managers_[i].get(), - device_block_mgr_ptr = block_managers_[i].get()]( - std::vector>&& results) { - for (auto&& result : results) { - if (result.value() != host_blocks.size()) { - LOG(FATAL) << "Offload copy fail, expected " << host_blocks.size() - << ", got " << result.value(); - } - } - host_block_mgr_ptr->cache(host_blocks); - host_block_mgr_ptr->deallocate({host_blocks}); - device_block_mgr_ptr->deallocate({device_blocks}); - return 0; - }); - } - - offload_block_transfer_infos_.clear(); - released_host_blocks_.clear(); - released_device_blocks_.clear(); - offload_block_transfer_infos_.resize(block_managers_.size()); - released_host_blocks_.resize(block_managers_.size()); - released_device_blocks_.resize(block_managers_.size()); -} - void BlockManagerPool::reset_transfer_infos() { swap_block_transfer_infos_.clear(); swap_block_transfer_infos_.resize(block_managers_.size()); - load_block_transfer_infos_.clear(); - load_block_transfer_infos_.resize(block_managers_.size()); } bool BlockManagerPool::allocate(Sequence* sequence) { @@ -206,9 +129,6 @@ bool BlockManagerPool::allocate(Sequence* sequence, size_t num_tokens) { // first try to allocate shared blocks if (sequence->kv_state().num_kv_blocks() == 0) { allocate_shared(sequence); - if (sequence->host_kv_state().num_kv_blocks() == 0) { - allocate_host_shared(sequence); - } } const size_t num_blocks = sequence->kv_state().num_kv_blocks(); @@ -232,25 +152,6 @@ bool BlockManagerPool::allocate(Sequence* sequence, size_t num_tokens) { sequence->add_kv_blocks(blocks); - size_t hbm_cache_token_num = sequence->kv_state().kv_cache_tokens_num(); - size_t host_cache_token_num = sequence->host_kv_state().kv_cache_tokens_num(); - if (hbm_cache_token_num < host_cache_token_num) { - auto hbm_blocks = sequence->kv_state().kv_blocks(); - auto host_blocks = sequence->host_kv_state().kv_blocks(); - - for (int i = hbm_cache_token_num / options_.block_size(); - i < host_cache_token_num / options_.block_size(); - i++) { - load_block_transfer_infos_[dp_rank].emplace_back( - BlockTransferInfo(host_blocks[i].id(), - hbm_blocks[i].id(), - host_blocks[i].get_immutable_hash_value(), - TransferType::H2D)); - } - sequence->kv_state().incr_kv_cache_tokens_num(host_cache_token_num - - hbm_cache_token_num); - } - return true; } @@ -289,39 +190,6 @@ bool BlockManagerPool::process_beam_search(Sequence* sequence, bool need_swap) { return true; } -uint32_t BlockManagerPool::pre_allocate(Sequence* sequence) { - DCHECK(sequence != nullptr); - - if (!options_.enable_kvcache_store() || - sequence->kv_state().num_kv_blocks() != 0 || - sequence->host_kv_state().num_kv_blocks() != 0) { - return 0; - } - - int32_t dp_rank = get_dp_rank(sequence); - allocate_host_shared(sequence); - - const size_t num_blocks = sequence->host_kv_state().num_kv_blocks(); - // round down to the nearest block number - const size_t block_size = options_.block_size(); - const size_t num_additional_blocks = - sequence->num_tokens() / block_size - num_blocks; - if (num_additional_blocks <= 0) { - return 0; - } - - auto host_blocks = - host_block_managers_[dp_rank]->allocate(num_additional_blocks); - if (host_blocks.size() != num_additional_blocks) { - return 0; - } - - PrefixCache::compute_hash_keys(sequence->tokens(), host_blocks); - - sequence->host_kv_state().add_kv_blocks(host_blocks); - return num_additional_blocks; -} - void BlockManagerPool::allocate_shared(Sequence* sequence) { // only allocate shared blocks for prefill sequences if (options_.enable_prefix_cache()) { @@ -344,70 +212,6 @@ void BlockManagerPool::cache(Sequence* sequence) { block_managers_[dp_rank]->cache(token_ids, *blocks); } -void BlockManagerPool::allocate_host_shared(Sequence* sequence) { - // only allocate shared blocks for prefill sequences - if (sequence->host_kv_state().num_kv_blocks() != 0 || - host_block_managers_.size() != block_managers_.size()) { - return; - } - - if (options_.enable_prefix_cache()) { - int32_t dp_rank = get_dp_rank(sequence); - std::vector shared_blocks = - host_block_managers_[dp_rank]->allocate_shared(sequence->tokens()); - sequence->add_shared_host_kv_blocks(std::move(shared_blocks)); - } -} - -void BlockManagerPool::save_offload_blocks(Sequence* sequence) { - DCHECK(sequence != nullptr); - - auto* blocks = sequence->kv_state().mutable_kv_blocks(); - auto* host_blocks = sequence->host_kv_state().mutable_kv_blocks(); - - if (blocks->size() == 0 || host_blocks->size() >= blocks->size()) { - return; - } - - int cached_block_num = - sequence->host_kv_state().kv_cache_tokens_num() / options_.block_size(); - - int32_t dp_rank = get_dp_rank(sequence); - - if (host_blocks->size() > 0) { - host_block_managers_[dp_rank]->cache(sequence->tokens(), *host_blocks); - } - - size_t needed_block_num = - sequence->num_tokens() / options_.block_size() - host_blocks->size(); - - if (needed_block_num == 0) { - return; - } - - sequence->host_kv_state().add_kv_blocks( - host_block_managers_[dp_rank]->allocate(needed_block_num)); - - for (int i = cached_block_num; i < host_blocks->size(); i++) { - if (blocks->at(i).ref_count() != 2) { - continue; - } - - host_blocks->at(i).set_hash_value(blocks->at(i).get_immutable_hash_value()); - released_host_blocks_[dp_rank].emplace_back(std::move(host_blocks->at(i))); - released_device_blocks_[dp_rank].emplace_back(std::move(blocks->at(i))); - offload_block_transfer_infos_[dp_rank].emplace_back(BlockTransferInfo( - released_device_blocks_[dp_rank].back().id(), - released_host_blocks_[dp_rank].back().id(), - released_host_blocks_[dp_rank].back().get_immutable_hash_value(), - TransferType::D2G)); - } - host_block_managers_[dp_rank]->cache( - *sequence->host_kv_state().mutable_kv_blocks()); - host_block_managers_[dp_rank]->deallocate( - sequence->host_kv_state().kv_blocks()); -} - void BlockManagerPool::get_merged_kvcache_event(KvCacheEvent* event) const { for (int32_t i = 0; i < block_managers_.size(); ++i) { block_managers_[i]->get_merged_kvcache_event(event); diff --git a/xllm/core/framework/block/block_manager_pool.h b/xllm/core/framework/block/block_manager_pool.h index 21d6f6a2f..e948d1ecc 100644 --- a/xllm/core/framework/block/block_manager_pool.h +++ b/xllm/core/framework/block/block_manager_pool.h @@ -23,7 +23,7 @@ limitations under the License. namespace xllm { -class BlockManagerPool final : public KVCacheManager { +class BlockManagerPool : public KVCacheManager { public: struct Options { PROPERTY(uint32_t, num_blocks) = 0; @@ -39,73 +39,52 @@ class BlockManagerPool final : public KVCacheManager { ~BlockManagerPool() = default; - BlockManager* get_block_manager(Sequence* sequence, bool is_host); - - bool allocate(Sequence* sequence) override; - bool allocate(std::vector& sequences) override; - bool allocate(Sequence* sequence, size_t num_tokens) override; - - uint32_t pre_allocate(Sequence* sequence) override; + virtual bool allocate(Sequence* sequence) override; + virtual bool allocate(std::vector& sequences) override; + virtual bool allocate(Sequence* sequence, size_t num_tokens) override; // Try to allocate blocks with num_tokens, // return {} if not enough blocks - std::vector allocate(size_t num_tokens, int32_t& dp_rank) override; - - void deallocate(Request* request) override; - void deallocate(std::vector& sequences) override; - void deallocate(Sequence* sequence) override; - - void allocate_shared(Sequence* sequence) override; - void cache(Sequence* sequence) override; - - std::vector>* get_swap_block_transfer_infos() - override; - std::vector>* - get_offload_block_transfer_infos() override; - std::vector>* get_load_block_transfer_infos() - override; - void postprocess_offload( - std::vector>>& futures) override; - void reset_transfer_infos() override; - - void get_merged_kvcache_event(KvCacheEvent* event) const; - float get_gpu_cache_usage_perc() const; - - uint32_t num_blocks() const override; - int32_t block_size() const override; - std::vector num_blocks_in_prefix_cache() const override; - std::vector num_free_blocks() const override; - std::vector num_used_blocks() const override; - double kv_cache_utilization() const override; - bool allow_host_block_extend() override { - return !host_block_managers_.empty(); - }; + virtual std::vector allocate(size_t num_tokens, + int32_t& dp_rank) override; + + virtual void deallocate(Request* request) override; + virtual void deallocate(std::vector& sequences) override; + virtual void deallocate(Sequence* sequence) override; + + virtual void allocate_shared(Sequence* sequence) override; + virtual void cache(Sequence* sequence) override; + + virtual std::vector>* + get_swap_block_transfer_infos() override; + virtual void reset_transfer_infos() override; + + virtual void get_merged_kvcache_event(KvCacheEvent* event) const; + virtual float get_gpu_cache_usage_perc() const; + + virtual uint32_t num_blocks() const override; + virtual int32_t block_size() const override; + virtual std::vector num_blocks_in_prefix_cache() const override; + virtual std::vector num_free_blocks() const override; + virtual std::vector num_used_blocks() const override; + virtual double kv_cache_utilization() const override; // get the options for the block manager const Options& options() const { return options_; } - private: + protected: int32_t get_manager_with_max_free_blocks() const; int32_t get_dp_rank(Sequence* sequence) const; - void allocate_host_shared(Sequence* sequence); - void save_offload_blocks(Sequence* sequence); - bool process_beam_search(Sequence* sequence, bool need_swap = false); private: - std::vector> block_managers_; - std::vector> host_block_managers_; + std::vector> swap_block_transfer_infos_; + protected: // the options for the block manager Options options_; - - // BlockTransferInfo per step - std::vector> swap_block_transfer_infos_; - std::vector> load_block_transfer_infos_; - std::vector> offload_block_transfer_infos_; - std::vector> released_host_blocks_; - std::vector> released_device_blocks_; + std::vector> block_managers_; }; } // namespace xllm diff --git a/xllm/core/framework/block/concurrent_block_manager_impl.cpp b/xllm/core/framework/block/concurrent_block_manager_impl.cpp index b80267c43..d3834028b 100644 --- a/xllm/core/framework/block/concurrent_block_manager_impl.cpp +++ b/xllm/core/framework/block/concurrent_block_manager_impl.cpp @@ -30,6 +30,11 @@ void ConcurrentBlockManagerImpl::deallocate(const Slice& blocks) { BlockManagerImpl::deallocate(blocks); } +void ConcurrentBlockManagerImpl::deallocate(std::vector& blocks) { + std::lock_guard lock(mutex_); + BlockManagerImpl::deallocate(blocks); +} + std::vector ConcurrentBlockManagerImpl::allocate_shared( const Slice& tokens_ids, const Slice& existed_shared_blocks) { diff --git a/xllm/core/framework/block/concurrent_block_manager_impl.h b/xllm/core/framework/block/concurrent_block_manager_impl.h index 68c2804c6..37c87ab22 100644 --- a/xllm/core/framework/block/concurrent_block_manager_impl.h +++ b/xllm/core/framework/block/concurrent_block_manager_impl.h @@ -30,6 +30,8 @@ class ConcurrentBlockManagerImpl : public BlockManagerImpl { void deallocate(const Slice& blocks) override; + void deallocate(std::vector& blocks) override; + // try to share blocks among sequences with the same prefix std::vector allocate_shared( const Slice& tokens_ids, diff --git a/xllm/core/framework/block/hierarchy_block_manager_pool.cpp b/xllm/core/framework/block/hierarchy_block_manager_pool.cpp new file mode 100644 index 000000000..89496d383 --- /dev/null +++ b/xllm/core/framework/block/hierarchy_block_manager_pool.cpp @@ -0,0 +1,281 @@ +/* Copyright 2025 The xLLM Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/jd-opensource/xllm/blob/main/LICENSE + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "hierarchy_block_manager_pool.h" + +#include "block_manager_impl.h" +#include "concurrent_block_manager_impl.h" + +namespace xllm { + +HierarchyBlockManagerPool::HierarchyBlockManagerPool( + const BlockManagerPool::Options& options, + Engine* engine, + int32_t dp_size) + : engine_(engine), BlockManagerPool(options, dp_size) { + CHECK(dp_size > 0) << "dp_size must be greater than 0"; + host_block_managers_.reserve(dp_size); + + BlockManager::Options host_options; + host_options.num_blocks(options_.num_blocks()) + .block_size(options_.block_size()) + .enable_prefix_cache(options_.enable_prefix_cache()) + .enable_disagg_pd(options_.enable_disagg_pd()) + .num_blocks(options_.host_num_blocks()) + .enable_cache_upload(false); + + for (int32_t i = 0; i < dp_size; ++i) { + if (options.enable_disagg_pd() || options_.enable_kvcache_store()) { + host_block_managers_.emplace_back( + std::make_unique(host_options)); + } else { + host_block_managers_.emplace_back( + std::make_unique(host_options)); + } + } + + load_block_transfer_infos_.resize(host_block_managers_.size()); + offload_block_transfer_infos_.resize(host_block_managers_.size()); + saved_host_blocks_.resize(host_block_managers_.size()); + saved_device_blocks_.resize(host_block_managers_.size()); +} + +void HierarchyBlockManagerPool::deallocate(Sequence* sequence) { + DCHECK(sequence != nullptr); + // add blocks to the prefix cache + int32_t dp_rank = BlockManagerPool::get_dp_rank(sequence); + BlockManagerPool::cache(sequence); + + auto* blocks = sequence->kv_state().mutable_kv_blocks(); + auto* host_blocks = sequence->host_kv_state().mutable_kv_blocks(); + + if (blocks->size() == 0 || host_blocks->size() >= blocks->size()) { + return; + } + + size_t cached_block_num = + sequence->host_kv_state().kv_cache_tokens_num() / options_.block_size(); + + if (host_blocks->size() > 0) { + host_block_managers_[dp_rank]->cache(sequence->tokens(), *host_blocks); + } + + size_t needed_block_num = + sequence->num_tokens() / options_.block_size() - host_blocks->size(); + + if (needed_block_num == 0) { + return; + } + + sequence->host_kv_state().add_kv_blocks( + host_block_managers_[dp_rank]->allocate(needed_block_num)); + + for (size_t i = cached_block_num; i < host_blocks->size(); i++) { + if (blocks->at(i).ref_count() != 2) { + continue; + } + + host_blocks->at(i).set_hash_value(blocks->at(i).get_immutable_hash_value()); + saved_host_blocks_[dp_rank].emplace_back(std::move(host_blocks->at(i))); + saved_device_blocks_[dp_rank].emplace_back(std::move(blocks->at(i))); + offload_block_transfer_infos_[dp_rank].emplace_back(BlockTransferInfo( + saved_device_blocks_[dp_rank].back().id(), + saved_host_blocks_[dp_rank].back().id(), + saved_host_blocks_[dp_rank].back().get_immutable_hash_value(), + saved_host_blocks_[dp_rank].back().get_hash_value_len(), + TransferType::D2G)); + } + host_block_managers_[dp_rank]->cache( + *sequence->host_kv_state().mutable_kv_blocks()); + host_block_managers_[dp_rank]->deallocate( + sequence->host_kv_state().kv_blocks()); + + block_managers_[dp_rank]->deallocate(sequence->kv_state().kv_blocks()); + // release the blocks after prefix cache insertion + sequence->reset(); +} + +bool HierarchyBlockManagerPool::allocate(Sequence* sequence, + size_t num_tokens) { + BlockManagerPool::allocate(sequence, num_tokens); + + if (sequence->host_kv_state().num_kv_blocks() == 0 && + sequence->stage() != SequenceStage::DECODE) { + allocate_host_shared(sequence); + } + + int32_t dp_rank = BlockManagerPool::get_dp_rank(sequence); + size_t hbm_cache_token_num = sequence->kv_state().kv_cache_tokens_num(); + size_t host_cache_token_num = sequence->host_kv_state().kv_cache_tokens_num(); + if (hbm_cache_token_num < host_cache_token_num) { + auto hbm_blocks = sequence->kv_state().kv_blocks(); + auto host_blocks = sequence->host_kv_state().kv_blocks(); + + for (int i = hbm_cache_token_num / options_.block_size(); + i < host_cache_token_num / options_.block_size(); + i++) { + load_block_transfer_infos_[dp_rank].emplace_back( + BlockTransferInfo(host_blocks[i].id(), + hbm_blocks[i].id(), + host_blocks[i].get_immutable_hash_value(), + TransferType::H2D)); + } + sequence->kv_state().incr_kv_cache_tokens_num(host_cache_token_num - + hbm_cache_token_num); + } + return true; +} + +void HierarchyBlockManagerPool::allocate_host_shared(Sequence* sequence) { + if (options_.enable_prefix_cache()) { + int32_t dp_rank = BlockManagerPool::get_dp_rank(sequence); + std::vector shared_blocks = + host_block_managers_[dp_rank]->allocate_shared(sequence->tokens()); + sequence->add_shared_host_kv_blocks(std::move(shared_blocks)); + } +} + +void HierarchyBlockManagerPool::prefetch_from_storage( + std::shared_ptr& request) { + if (!options_.enable_kvcache_store()) { + return; + } + + for (auto& prefill_sequence : request->sequences()) { + DCHECK(prefill_sequence.get() != nullptr); + + int32_t dp_rank = BlockManagerPool::get_dp_rank(prefill_sequence.get()); + std::vector shared_blocks = + host_block_managers_[dp_rank]->allocate_shared( + prefill_sequence->tokens()); + prefill_sequence->add_shared_host_kv_blocks(std::move(shared_blocks)); + + const size_t num_blocks = prefill_sequence->host_kv_state().num_kv_blocks(); + // round down to the nearest block number + const size_t block_size = options_.block_size(); + const size_t num_additional_blocks = + prefill_sequence->num_tokens() / block_size - num_blocks; + if (num_additional_blocks <= 0) { + return; + } + + auto host_blocks = + host_block_managers_[dp_rank]->allocate(num_additional_blocks); + if (host_blocks.size() != num_additional_blocks) { + return; + } + prefill_sequence->host_kv_state().add_kv_blocks(host_blocks); + PrefixCache::compute_hash_keys( + prefill_sequence->tokens(), + *prefill_sequence->host_kv_state().mutable_kv_blocks()); + + if (num_additional_blocks > 0) { + const auto host_blocks = prefill_sequence->host_kv_state().kv_blocks(); + std::vector block_transfer_infos; + block_transfer_infos.reserve(num_additional_blocks); + for (int i = host_blocks.size() - num_additional_blocks; + i < host_blocks.size(); + i++) { + block_transfer_infos.emplace_back( + BlockTransferInfo(-1, + host_blocks[i].id(), + host_blocks[i].get_immutable_hash_value(), + TransferType::G2H)); + } + + engine_->prefetch_from_storage(prefill_sequence->dp_rank(), + std::move(block_transfer_infos), + prefill_sequence->get_termination_flag(), + prefill_sequence->get_prefetch_results()); + } + } +} + +bool HierarchyBlockManagerPool::update_prefetch_result( + std::shared_ptr& request, + const uint32_t timeout) { + if (!options_.enable_kvcache_store()) { + return true; + } + + bool prefetch_result = true; + for (auto& prefill_sequence : request->sequences()) { + prefetch_result &= prefill_sequence->update_prefetch_result(timeout); + } + return prefetch_result; +} + +void HierarchyBlockManagerPool::transfer_blocks( + std::optional> batches) { + if (batches.has_value()) { + // load blocks from host to device + for (int i = 0; i < batches->size(); i++) { + if (!load_block_transfer_infos_[i].empty()) { + batches->at(i).set_batch_id(); + engine_->transfer_kv_blocks(i, + batches->at(i).batch_id(), + std::move(load_block_transfer_infos_[i])); + } + } + + load_block_transfer_infos_.clear(); + load_block_transfer_infos_.resize(host_block_managers_.size()); + } + + // offload blocks from device to host and kvcache store + for (int i = 0; i < offload_block_transfer_infos_.size(); i++) { + if (!offload_block_transfer_infos_[i].empty()) { + folly::collectAll(std::move(engine_->transfer_kv_blocks( + i, std::move(offload_block_transfer_infos_[i])))) + .via(folly::getGlobalCPUExecutor()) + .thenValue([host_blocks = std::move(saved_host_blocks_[i]), + device_blocks = std::move(saved_device_blocks_[i]), + host_block_mgr_ptr = host_block_managers_[i].get(), + device_block_mgr_ptr = block_managers_[i].get()]( + std::vector>&& results) { + for (auto&& result : results) { + if (result.value() != host_blocks.size()) { + LOG(FATAL) << "Offload copy fail, expected " + << host_blocks.size() << ", got " << result.value(); + } + } + host_block_mgr_ptr->cache(host_blocks); + host_block_mgr_ptr->deallocate({host_blocks}); + device_block_mgr_ptr->deallocate({device_blocks}); + return 0; + }); + } + } + + offload_block_transfer_infos_.clear(); + saved_host_blocks_.clear(); + saved_device_blocks_.clear(); + offload_block_transfer_infos_.resize(host_block_managers_.size()); + saved_host_blocks_.resize(host_block_managers_.size()); + saved_device_blocks_.resize(host_block_managers_.size()); +} + +void HierarchyBlockManagerPool::get_merged_kvcache_event( + KvCacheEvent* event) const { + if (host_block_managers_.empty()) { + BlockManagerPool::get_merged_kvcache_event(event); + } else { + for (int32_t i = 0; i < host_block_managers_.size(); ++i) { + host_block_managers_[i]->get_merged_kvcache_event(event); + } + } +} + +} // namespace xllm diff --git a/xllm/core/framework/block/hierarchy_block_manager_pool.h b/xllm/core/framework/block/hierarchy_block_manager_pool.h new file mode 100644 index 000000000..0248648a4 --- /dev/null +++ b/xllm/core/framework/block/hierarchy_block_manager_pool.h @@ -0,0 +1,59 @@ +/* Copyright 2025 The xLLM Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/jd-opensource/xllm/blob/main/LICENSE + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#pragma once + +#include "block_manager_pool.h" +#include "runtime/engine.h" + +namespace xllm { + +class Engine; + +class HierarchyBlockManagerPool : public BlockManagerPool { + public: + explicit HierarchyBlockManagerPool(const BlockManagerPool::Options& options, + Engine* engine, + int32_t dp_size = 1); + ~HierarchyBlockManagerPool() = default; + + bool allocate(Sequence* sequence, size_t num_tokens) override; + + void deallocate(Sequence* sequence) override; + + void transfer_blocks(std::optional> batches) override; + + void prefetch_from_storage(std::shared_ptr& request) override; + + bool update_prefetch_result(std::shared_ptr& request, + const uint32_t timeout) override; + + void get_merged_kvcache_event(KvCacheEvent* event) const override; + + private: + void allocate_host_shared(Sequence* sequence); + + private: + Engine* engine_; + std::vector> host_block_managers_; + + // BlockTransferInfo per step + std::vector> load_block_transfer_infos_; + std::vector> offload_block_transfer_infos_; + std::vector> saved_host_blocks_; + std::vector> saved_device_blocks_; +}; + +} // namespace xllm diff --git a/xllm/core/framework/block/kv_cache_manager.h b/xllm/core/framework/block/kv_cache_manager.h index 36b41d204..586bdec6a 100644 --- a/xllm/core/framework/block/kv_cache_manager.h +++ b/xllm/core/framework/block/kv_cache_manager.h @@ -18,6 +18,7 @@ limitations under the License. #include #include "common/macros.h" +#include "framework/batch/batch.h" #include "framework/model/model_input_params.h" #include "framework/request/request.h" #include "framework/request/sequence.h" @@ -32,7 +33,19 @@ class KVCacheManager { virtual bool allocate(std::vector& sequences) = 0; virtual bool allocate(Sequence* sequence, size_t num_tokens) = 0; - virtual uint32_t pre_allocate(Sequence* sequence) = 0; + virtual void transfer_blocks(std::optional> batches) { + return; + }; + + virtual void prefetch_from_storage(std::shared_ptr& request) { + return; + }; + + virtual bool update_prefetch_result(std::shared_ptr& request, + const uint32_t timeout) { + return true; + }; + virtual std::vector allocate(size_t num_tokens, int32_t& dp_rank) = 0; virtual void deallocate(Request* request) = 0; @@ -45,15 +58,6 @@ class KVCacheManager { virtual std::vector>* get_swap_block_transfer_infos() = 0; - virtual std::vector>* - get_offload_block_transfer_infos() = 0; - - virtual std::vector>* - get_load_block_transfer_infos() = 0; - - virtual void postprocess_offload( - std::vector>>& futures) = 0; - virtual void reset_transfer_infos() = 0; virtual uint32_t num_blocks() const = 0; @@ -62,7 +66,6 @@ class KVCacheManager { virtual std::vector num_free_blocks() const = 0; virtual std::vector num_used_blocks() const = 0; virtual double kv_cache_utilization() const = 0; - virtual bool allow_host_block_extend() { return false; }; protected: KVCacheManager() = default; diff --git a/xllm/core/framework/kv_cache/CMakeLists.txt b/xllm/core/framework/kv_cache/CMakeLists.txt index 423165b97..a03cfb8e1 100644 --- a/xllm/core/framework/kv_cache/CMakeLists.txt +++ b/xllm/core/framework/kv_cache/CMakeLists.txt @@ -15,6 +15,7 @@ cc_library( $<$:llm_data_dist_transfer.h> $<$:spec_kv_cache_transfer.h> kv_cache_store.h + hierarchy_kv_cache_transfer.h SRCS embedding_allocator.cpp $<$:hccl_kv_cache_transfer.cpp> @@ -23,6 +24,7 @@ cc_library( $<$:llm_data_dist_transfer.cpp> $<$:spec_kv_cache_transfer.cpp> kv_cache_store.cpp + hierarchy_kv_cache_transfer.cpp DEPS :common $<$:graph> diff --git a/xllm/core/framework/kv_cache/hierarchy_kv_cache_transfer.cpp b/xllm/core/framework/kv_cache/hierarchy_kv_cache_transfer.cpp new file mode 100644 index 000000000..432220678 --- /dev/null +++ b/xllm/core/framework/kv_cache/hierarchy_kv_cache_transfer.cpp @@ -0,0 +1,544 @@ +/* Copyright 2025 The xLLM Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/jd-opensource/xllm/blob/main/LICENSE + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "hierarchy_kv_cache_transfer.h" + +#include +#include + +#include + +#include "kv_cache_store.h" +namespace xllm { + +constexpr uint64_t MBUF_SIZE = 128 * 1024 * 1024; +constexpr uint32_t BATCH_COPY_MAX_SIZE = 4096; +constexpr uint32_t TIMEOUT_S = 60; // second +constexpr uint32_t TIMEOUT_MS = 60000; // millisecond + +HierarchyKVCacheTransfer::HierarchyKVCacheTransfer( + const Options& options, + const torch::Device& device, + std::vector* kv_caches_ptr) + : options_(options), device_(device), kv_caches_ptr_(kv_caches_ptr) { + device_.set_device(); + device_.init_device_context(); + h2d_threadpool_ = std::make_unique( + 2, [this]() mutable { device_.set_device(); }); + d2h_threadpool_ = std::make_unique( + 5, [this]() mutable { device_.set_device(); }); + for (int i = 0; i < h2d_threadpool_->size() + d2h_threadpool_->size(); i++) { + copy_stream_.enqueue(device_.get_stream_from_pool(TIMEOUT_MS)); + } + + if (options_.host_blocks_factor() > 1) { + create_page_aligned_host_cache(); + } + + if (options_.enable_kvcache_store()) { + StoreConfig config; + config.localhost_name = options_.store_local_hostname(); + config.protocol = options_.store_protocol(); + config.metadata_server = options_.store_metadata_server(); + config.master_server_address = options_.store_master_server_address(); + config.tp_rank = options_.tp_rank(); + config.total_size = page_aligned_data_size_; + config.tensor_data = page_aligned_data_; + + if (!KVCacheStore::get_instance().init(config, &host_kv_caches_)) { + LOG(FATAL) << "Init KVCacheStore fail!"; + } + } +} + +HierarchyKVCacheTransfer::~HierarchyKVCacheTransfer() { + if (page_aligned_data_ != nullptr) { +#if defined(USE_NPU) + aclrtHostUnregister(page_aligned_data_); +#endif + munlock(page_aligned_data_, page_aligned_data_size_); + munmap(page_aligned_data_, page_aligned_data_size_); + } +} + +uint32_t HierarchyKVCacheTransfer::transfer_kv_blocks( + const uint64_t batch_id, + const std::vector& block_transfer_info) { + CHECK(!block_transfer_info.empty()); + + switch (block_transfer_info[0].transfer_type) { + case TransferType::H2D: { + h2d_threadpool_->schedule( + [this, + batch_id = batch_id, + block_transfer_info = std::move(block_transfer_info)]() mutable { + Slice info_slice{block_transfer_info}; + h2d_batch_copy(batch_id, info_slice); + }); + break; + } + case TransferType::D2G: + return offload_kv_blocks(std::move(block_transfer_info)); + case TransferType::G2D: { + // TODO load_kv_blocks async + LOG(ERROR) << "Unsupport copy type G2D."; + break; + } + default: + LOG(ERROR) << "Unsupport copy type: " + << uint32_t(block_transfer_info[0].transfer_type); + break; + } + return 0; +} + +uint32_t HierarchyKVCacheTransfer::transfer_kv_blocks( + const uint64_t batch_id, + Slice& block_transfer_info) { + CHECK(!block_transfer_info.empty()); + + switch (block_transfer_info[0].transfer_type) { + case TransferType::G2H: + return load_from_store(block_transfer_info); + default: + LOG(ERROR) << "Unsupport copy type: " + << uint32_t(block_transfer_info[0].transfer_type); + break; + } + return 0; +} + +void HierarchyKVCacheTransfer::set_layer_synchronizer( + ModelInputParams& params) { +#if defined(USE_NPU) + { + std::lock_guard lock(mutex_); + if (layer_wise_load_synchronizer_.count(params.batch_id) != 0) { + params.layer_wise_load_synchronizer = + layer_wise_load_synchronizer_[params.batch_id]; + layer_wise_load_synchronizer_.erase(params.batch_id); + uint32_t event_cnt = + params.layer_wise_load_synchronizer->get_event_size(); + params.layers_per_bacth_copy = + (options_.layers() + event_cnt - 1) / event_cnt; + } + } +#endif +} + +uint32_t HierarchyKVCacheTransfer::offload_kv_blocks( + const std::vector& block_transfer_info) { + if (block_transfer_info.empty()) { + return 0; + } + + const int64_t num_layers = options_.layers(); + uint32_t max_blocks_per_batch = + BATCH_COPY_MAX_SIZE / (cache_tensor_cnt_ * num_layers); + uint32_t total_slice = + (block_transfer_info.size() + max_blocks_per_batch - 1) / + max_blocks_per_batch; + + Slice transfer_info_slice(block_transfer_info); + std::vector> futures; + futures.reserve(total_slice); + + for (size_t i = 0; i < block_transfer_info.size(); + i += max_blocks_per_batch) { + folly::Promise promise; + auto future = promise.getSemiFuture(); + auto slice = transfer_info_slice.slice( + i, std::min(i + max_blocks_per_batch, block_transfer_info.size())); + + d2h_threadpool_->schedule([this, + promise = std::move(promise), + slice = std::move(slice)]() mutable { + bool ret = d2h_batch_copy(slice); + auto success_cnt = offload_to_store(slice); + if (success_cnt != slice.size()) { + LOG(WARNING) << "KVCacheStore not all put success: " << success_cnt + << "/" << slice.size(); + } + promise.setValue(ret); + }); + + futures.emplace_back(std::move(future)); + } + + if (!futures.empty()) { + try { + // TODO(kangmeng): add timeout + auto all_results = folly::collect(futures).get(); + if (!std::all_of(all_results.begin(), all_results.end(), [](bool result) { + return result; + })) { + LOG(FATAL) << "Not all D2H copy returned true"; + } + } catch (const std::exception& e) { + LOG(FATAL) << "Future execution failed: " << e.what(); + } + } + + return block_transfer_info.size(); +} + +bool HierarchyKVCacheTransfer::d2h_batch_copy( + Slice& block_transfer_info) { +#if defined(USE_NPU) + const int64_t num_layers = options_.layers(); + uint32_t num_batches = + block_transfer_info.size() * num_layers * cache_tensor_cnt_; + void** srcs = new void*[num_batches]; + void** dsts = new void*[num_batches]; + size_t* copy_size = new size_t[num_batches]; + aclrtMemcpyBatchAttr attrs[1] = {d2h_attrs_}; + size_t attrs_indexes[1] = {0}; + size_t fail_index; + uint32_t curr_index = 0; + + for (const auto& info : block_transfer_info) { + auto dst_k_cache = host_kv_caches_.at(info.dst_block_id).get_k_cache(); + auto dst_v_cache = host_kv_caches_.at(info.dst_block_id).get_v_cache(); + auto dst_index_cache = + host_kv_caches_.at(info.dst_block_id).get_index_cache(); + + for (int layer_id = 0; layer_id < num_layers; layer_id++) { + auto src_k_cache = kv_caches_ptr_->at(layer_id).get_k_cache(); + srcs[curr_index] = src_k_cache[info.src_block_id].data_ptr(); + dsts[curr_index] = dst_k_cache[layer_id].data_ptr(); + copy_size[curr_index] = cache_size_per_layer_[0]; + curr_index++; + + if (cache_size_per_layer_[1] != 0) { + auto src_v_cache = kv_caches_ptr_->at(layer_id).get_v_cache(); + srcs[curr_index] = src_v_cache[info.src_block_id].data_ptr(); + dsts[curr_index] = dst_v_cache[layer_id].data_ptr(); + copy_size[curr_index] = cache_size_per_layer_[1]; + curr_index++; + } + + if (cache_size_per_layer_[2] != 0) { + auto src_index_cache = kv_caches_ptr_->at(layer_id).get_index_cache(); + srcs[curr_index] = src_index_cache[info.src_block_id].data_ptr(); + dsts[curr_index] = dst_index_cache[layer_id].data_ptr(); + copy_size[curr_index] = cache_size_per_layer_[2]; + curr_index++; + } + } + } + + std::unique_ptr stream; + copy_stream_.wait_dequeue(stream); + c10::StreamGuard streamGuard = stream->set_stream_guard(); + + // TODO(kangmeng): change to async API + aclError ret = aclrtMemcpyBatch(dsts, + copy_size, + srcs, + copy_size, + num_batches, + attrs, + attrs_indexes, + 1, + &fail_index); + if (ret != 0 || fail_index != SIZE_MAX) { + LOG(ERROR) << "aclrtMemcpyBatch error: " << ret + << ", fail_index:" << fail_index; + copy_stream_.enqueue(std::move(stream)); + return false; + } + + if (stream->synchronize() != 0) { + LOG(ERROR) << "d2h_batch_copy timeout!"; + copy_stream_.enqueue(std::move(stream)); + return false; + } + + copy_stream_.enqueue(std::move(stream)); + + delete[] dsts; + delete[] srcs; + delete[] copy_size; +#endif + return true; +} + +bool HierarchyKVCacheTransfer::h2d_batch_copy( + const uint64_t batch_id, + Slice& block_transfer_info) { +#if defined(USE_NPU) + CHECK(block_transfer_info.size() < BATCH_COPY_MAX_SIZE / cache_tensor_cnt_) + << "h2d_batch_copy support copy blocks less than " + << BATCH_COPY_MAX_SIZE / cache_tensor_cnt_ << ", but got " + << block_transfer_info.size(); + + if (block_transfer_info.empty()) { + return true; + } + + const int64_t num_layers = options_.layers(); + uint32_t layers_per_bacth_copy = + num_layers / options_.layers_wise_copy_batchs(); + uint32_t num_batches = block_transfer_info.size() * cache_tensor_cnt_; + while (num_batches * layers_per_bacth_copy > BATCH_COPY_MAX_SIZE) { + layers_per_bacth_copy--; + } + + uint32_t copy_cnt = + (num_layers + layers_per_bacth_copy - 1) / layers_per_bacth_copy; + auto synchronizer = std::make_shared(copy_cnt); + { + std::lock_guard lock(mutex_); + if (layer_wise_load_synchronizer_.count(batch_id) != 0) { + LOG(FATAL) << "Batch id already exists!"; + } + layer_wise_load_synchronizer_[batch_id] = synchronizer; + } + + aclrtMemcpyBatchAttr attrs[1] = {h2d_attrs_}; + size_t attrs_indexes[1] = {0}; + + std::unique_ptr stream; + copy_stream_.wait_dequeue(stream); + c10::StreamGuard streamGuard = stream->set_stream_guard(); + aclError ret = 0; + + void** srcs = new void*[num_batches * layers_per_bacth_copy]; + void** dsts = new void*[num_batches * layers_per_bacth_copy]; + size_t* copy_size = new size_t[num_batches * layers_per_bacth_copy]; + + for (int index = 0; index < copy_cnt; index++) { + int layer_id = index * layers_per_bacth_copy; + size_t fail_index = 0; + uint32_t curr_index = 0; + uint32_t layer_cnt = 0; + + while (layer_id < (index + 1) * layers_per_bacth_copy && + layer_id < num_layers) { + auto dst_k_cache = kv_caches_ptr_->at(layer_id).get_k_cache(); + auto dst_v_cache = kv_caches_ptr_->at(layer_id).get_v_cache(); + auto dst_index_cache = kv_caches_ptr_->at(layer_id).get_index_cache(); + + for (const auto& info : block_transfer_info) { + auto src_k_cache = host_kv_caches_.at(info.src_block_id).get_k_cache(); + srcs[curr_index] = src_k_cache[layer_id].data_ptr(); + dsts[curr_index] = dst_k_cache[info.dst_block_id].data_ptr(); + copy_size[curr_index] = cache_size_per_layer_[0]; + curr_index++; + + if (cache_size_per_layer_[1] != 0) { + auto src_v_cache = + host_kv_caches_.at(info.src_block_id).get_v_cache(); + srcs[curr_index] = src_v_cache[layer_id].data_ptr(); + dsts[curr_index] = dst_v_cache[info.dst_block_id].data_ptr(); + copy_size[curr_index] = cache_size_per_layer_[1]; + curr_index++; + } + + if (cache_size_per_layer_[2] != 0) { + auto src_index_cache = + host_kv_caches_.at(info.src_block_id).get_index_cache(); + srcs[curr_index] = src_index_cache[layer_id].data_ptr(); + dsts[curr_index] = dst_index_cache[info.dst_block_id].data_ptr(); + copy_size[curr_index] = cache_size_per_layer_[2]; + curr_index++; + } + } + layer_id++; + layer_cnt++; + } + + ret = aclrtMemcpyBatch(dsts, + copy_size, + srcs, + copy_size, + num_batches * layer_cnt, + attrs, + attrs_indexes, + 1, + &fail_index); + + if (ret != 0 || fail_index != SIZE_MAX) { + LOG(ERROR) << "aclrtMemcpyBatch error: " << ret + << ", fail_index:" << fail_index; + } else { + auto* event = synchronizer->get_event(index); + ret = aclrtRecordEvent(*event, stream->get_stream()->stream()); + if (ret != 0) { + LOG(ERROR) << "aclrtRecordEvent error: " << ret; + } + } + auto* event_flag = synchronizer->get_event_flag(index); + event_flag->store(true, std::memory_order_release); + if (ret != 0) break; + } + + if (stream->synchronize() != 0) { + LOG(ERROR) << "h2d_batch_copy timeout!"; + copy_stream_.enqueue(std::move(stream)); + return false; + } + + copy_stream_.enqueue(std::move(stream)); + + delete[] dsts; + delete[] srcs; + delete[] copy_size; +#endif + return true; +} + +uint32_t HierarchyKVCacheTransfer::offload_to_store( + Slice& block_transfer_info) { + if (!options_.enable_kvcache_store()) { + return block_transfer_info.size(); + } + + return KVCacheStore::get_instance().batch_put(block_transfer_info); +} + +uint32_t HierarchyKVCacheTransfer::load_from_store( + Slice& block_transfer_info) { + if (!options_.enable_kvcache_store()) { + return 0; + } + return KVCacheStore::get_instance().batch_get(block_transfer_info); +} + +void HierarchyKVCacheTransfer::create_page_aligned_host_cache() { + CHECK(kv_caches_ptr_->size() > 0) << "hbm kv cache size should > 0."; + CHECK(options_.host_blocks_factor() > 1) << "host_blocks_factor should > 1."; + + std::vector> tensor_shapes = + kv_caches_ptr_->at(0).get_shapes(); + + CHECK(!tensor_shapes[0].empty()) << "k cache should not be empty!"; + +#if defined(USE_NPU) + int32_t device_id = device_.index(); + h2d_attrs_.dstLoc.id = device_id; + h2d_attrs_.dstLoc.type = aclrtMemLocationType::ACL_MEM_LOCATION_TYPE_DEVICE; + h2d_attrs_.srcLoc.id = device_id; + h2d_attrs_.srcLoc.type = aclrtMemLocationType::ACL_MEM_LOCATION_TYPE_HOST; + memset(h2d_attrs_.rsv, 0, 16); + + d2h_attrs_.dstLoc.id = device_id; + d2h_attrs_.dstLoc.type = aclrtMemLocationType::ACL_MEM_LOCATION_TYPE_HOST; + d2h_attrs_.srcLoc.id = device_id; + d2h_attrs_.srcLoc.type = aclrtMemLocationType::ACL_MEM_LOCATION_TYPE_DEVICE; + memset(d2h_attrs_.rsv, 0, 16); +#endif + + cache_size_per_layer_.resize(3, 0); + cache_size_per_layer_[0] = + kv_caches_ptr_->at(0).get_k_cache()[0].numel() * + kv_caches_ptr_->at(0).get_k_cache()[0].element_size(); + + if (!tensor_shapes[1].empty()) { + cache_size_per_layer_[1] = + kv_caches_ptr_->at(0).get_v_cache()[0].numel() * + kv_caches_ptr_->at(0).get_v_cache()[0].element_size(); + cache_tensor_cnt_++; + } + + if (!tensor_shapes[2].empty()) { + cache_size_per_layer_[2] = + kv_caches_ptr_->at(0).get_index_cache()[0].numel() * + kv_caches_ptr_->at(0).get_index_cache()[0].element_size(); + cache_tensor_cnt_++; + } + + auto dtype = kv_caches_ptr_->at(0).get_k_cache().dtype(); + uint64_t num_blocks = tensor_shapes[0][0] * options_.host_blocks_factor(); + std::vector cache_size_per_tensor(3, 0); + + for (size_t i = 0; i < tensor_shapes.size(); i++) { + if (!tensor_shapes[i].empty()) { + tensor_shapes[i][0] = options_.layers(); + cache_size_per_tensor[i] = cache_size_per_layer_[i] * options_.layers(); + page_aligned_data_size_ += num_blocks * cache_size_per_tensor[i]; + } + } + + size_t page_size = sysconf(_SC_PAGESIZE); + page_aligned_data_size_ = + ((page_aligned_data_size_ + page_size - 1) / page_size) * page_size; + + page_aligned_data_ = mmap(nullptr, + page_aligned_data_size_, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS, + -1, + 0); + + if (page_aligned_data_ == MAP_FAILED) { + LOG(FATAL) << "Failed to allocate aligned memory pool!"; + } + + if (mlock(page_aligned_data_, page_aligned_data_size_) != 0) { + munmap(page_aligned_data_, page_aligned_data_size_); + LOG(FATAL) << "Failed to lock memory pool!"; + } + +#if defined(USE_NPU) + auto ret = aclrtHostRegister(page_aligned_data_, + page_aligned_data_size_, + aclrtHostRegisterType::ACL_HOST_REGISTER_MAPPED, + &mapped_ptr_); + if (ret != 0) { + LOG(FATAL) << "aclrtHostRegister fail: " << ret; + } +#endif + + size_t current_offset = 0; + auto options = torch::TensorOptions().dtype(dtype).device(torch::kCPU); + host_kv_caches_.reserve(num_blocks); + + for (size_t i = 0; i < num_blocks; ++i) { + torch::Tensor key_cache, value_cache, index_cache; + void* k_tensor_ptr = + static_cast(page_aligned_data_) + current_offset; + key_cache = torch::from_blob(k_tensor_ptr, tensor_shapes[0], options); + current_offset += cache_size_per_tensor[0]; + + if (!tensor_shapes[1].empty()) { + void* v_tensor_ptr = + static_cast(page_aligned_data_) + current_offset; + value_cache = torch::from_blob(v_tensor_ptr, tensor_shapes[1], options); + current_offset += cache_size_per_tensor[1]; + } + + if (!tensor_shapes[2].empty()) { + void* index_tensor_ptr = + static_cast(page_aligned_data_) + current_offset; + index_cache = + torch::from_blob(index_tensor_ptr, tensor_shapes[2], options); + current_offset += cache_size_per_tensor[2]; + } + + host_kv_caches_.emplace_back(key_cache, value_cache, index_cache); + } + + LOG(INFO) << "host k cache shape: " + << host_kv_caches_[0].get_k_cache().sizes(); + LOG(INFO) << "host v cache shape: " + << host_kv_caches_[0].get_v_cache().sizes(); + LOG(INFO) << "host index cache shape: " + << host_kv_caches_[0].get_index_cache().sizes(); + + LOG(INFO) << "Host block init finish, total size: " << num_blocks; +} + +} // namespace xllm diff --git a/xllm/core/framework/kv_cache/hierarchy_kv_cache_transfer.h b/xllm/core/framework/kv_cache/hierarchy_kv_cache_transfer.h new file mode 100644 index 000000000..8f2073ffd --- /dev/null +++ b/xllm/core/framework/kv_cache/hierarchy_kv_cache_transfer.h @@ -0,0 +1,108 @@ +/* Copyright 2025 The xLLM Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/jd-opensource/xllm/blob/main/LICENSE + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#pragma once + +#include + +#include + +#include "common/types.h" +#include "framework/model/model_input_params.h" +#include "kv_cache.h" +#include "platform/device.h" +#include "util/threadpool.h" + +#if defined(USE_NPU) +#include "acl/acl_rt.h" +#include "platform/npu/npu_layer_synchronizer.h" +#endif + +namespace xllm { +class HierarchyKVCacheTransfer { + public: + struct Options { + PROPERTY(uint32_t, tp_rank); + PROPERTY(uint32_t, layers); + PROPERTY(double, host_blocks_factor) = 0.0; + PROPERTY(uint32_t, layers_wise_copy_batchs) = 1; + PROPERTY(bool, enable_kvcache_store) = false; + PROPERTY(std::string, store_protocol) = "tcp"; + PROPERTY(std::string, store_master_server_address) = ""; + PROPERTY(std::string, store_metadata_server) = ""; + PROPERTY(std::string, store_local_hostname) = ""; + }; + + HierarchyKVCacheTransfer(const Options& options, + const torch::Device& device, + std::vector* kv_caches_ptr); + ~HierarchyKVCacheTransfer(); + + uint32_t transfer_kv_blocks( + const uint64_t batch_id, + const std::vector& block_transfer_info); + + uint32_t transfer_kv_blocks(const uint64_t batch_id, + Slice& block_transfer_info); + + void set_layer_synchronizer(ModelInputParams& params); + + private: + void create_page_aligned_host_cache(); + + uint32_t offload_kv_blocks( + const std::vector& block_transfer_info); + + bool d2h_batch_copy(Slice& block_transfer_info); + bool h2d_batch_copy(const uint64_t batch_id, + Slice& block_transfer_info); + + uint32_t offload_to_store(Slice& block_transfer_info); + uint32_t load_from_store(Slice& block_transfer_info); + + private: + // options + Options options_; + // device to run the model on + Device device_; + + // working thread for data copy + std::unique_ptr h2d_threadpool_; + std::unique_ptr d2h_threadpool_; + // copy streams only can be used in h2d_threadpool_ and d2h_threadpool_ + moodycamel::BlockingConcurrentQueue> copy_stream_; + + std::vector* kv_caches_ptr_; + std::vector host_kv_caches_; + + void* page_aligned_data_ = nullptr; + size_t page_aligned_data_size_ = 0; + + std::vector cache_size_per_layer_; + uint32_t cache_tensor_cnt_ = 1; + +#if defined(USE_NPU) + void* mapped_ptr_ = nullptr; + + aclrtMemcpyBatchAttr h2d_attrs_; + aclrtMemcpyBatchAttr d2h_attrs_; + + mutable std::mutex mutex_; + std::unordered_map> + layer_wise_load_synchronizer_; +#endif +}; + +} // namespace xllm diff --git a/xllm/core/framework/kv_cache/kv_cache.cpp b/xllm/core/framework/kv_cache/kv_cache.cpp index 46022e861..161955a87 100644 --- a/xllm/core/framework/kv_cache/kv_cache.cpp +++ b/xllm/core/framework/kv_cache/kv_cache.cpp @@ -35,6 +35,41 @@ torch::Tensor KVCache::get_k_cache() const { return key_cache_; } torch::Tensor KVCache::get_v_cache() const { return value_cache_; } torch::Tensor KVCache::get_index_cache() const { return index_cache_; } +std::vector> KVCache::get_shapes() { + std::vector> tensor_shapes(3); + if (key_cache_.defined() && key_cache_.numel() != 0) { + std::vector shape; + auto sizes = key_cache_.sizes(); + shape.resize(sizes.size()); + for (int i = 0; i < sizes.size(); ++i) { + shape[i] = sizes[i]; + } + tensor_shapes[0] = std::move(shape); + } + + if (value_cache_.defined() && value_cache_.numel() != 0) { + std::vector shape; + auto sizes = value_cache_.sizes(); + shape.resize(sizes.size()); + for (int i = 0; i < sizes.size(); ++i) { + shape[i] = sizes[i]; + } + tensor_shapes[1] = std::move(shape); + } + + if (index_cache_.defined() && index_cache_.numel() != 0) { + std::vector shape; + auto sizes = index_cache_.sizes(); + shape.resize(sizes.size()); + for (int i = 0; i < sizes.size(); ++i) { + shape[i] = sizes[i]; + } + tensor_shapes[2] = std::move(shape); + } + + return tensor_shapes; +} + void KVCache::swap_blocks(torch::Tensor& src_tensor, torch::Tensor& dst_tensor) { // batch select keys and values diff --git a/xllm/core/framework/kv_cache/kv_cache.h b/xllm/core/framework/kv_cache/kv_cache.h index 2dda7701f..0efdc7a6a 100644 --- a/xllm/core/framework/kv_cache/kv_cache.h +++ b/xllm/core/framework/kv_cache/kv_cache.h @@ -40,6 +40,8 @@ class KVCache final { torch::Tensor get_v_cache() const; torch::Tensor get_index_cache() const; + std::vector> get_shapes(); + std::shared_ptr get_k_xtensor() const; std::shared_ptr get_v_xtensor() const; diff --git a/xllm/core/framework/kv_cache/kv_cache_event.h b/xllm/core/framework/kv_cache/kv_cache_event.h index 6e17462da..f08b69a29 100644 --- a/xllm/core/framework/kv_cache/kv_cache_event.h +++ b/xllm/core/framework/kv_cache/kv_cache_event.h @@ -25,13 +25,10 @@ struct KvCacheEvent { stored_cache; std::unordered_set removed_cache; - std::unordered_set - offload_cache; void clear() { stored_cache.clear(); removed_cache.clear(); - offload_cache.clear(); } }; diff --git a/xllm/core/framework/kv_cache/kv_cache_store.cpp b/xllm/core/framework/kv_cache/kv_cache_store.cpp index 7b0ffdeb0..1443bf952 100644 --- a/xllm/core/framework/kv_cache/kv_cache_store.cpp +++ b/xllm/core/framework/kv_cache/kv_cache_store.cpp @@ -43,16 +43,22 @@ bool KVCacheStore::init(const StoreConfig& config, } client_ptr_ = client_opt.value(); - auto k_tensor_one_block = host_kv_caches_->at(0).get_k_cache(); - auto v_tensor_one_block = host_kv_caches_->at(0).get_v_cache(); - - k_cache_size_per_block_ = - k_tensor_one_block.numel() * k_tensor_one_block.element_size(); - v_cache_size_per_block_ = - v_tensor_one_block.numel() * v_tensor_one_block.element_size(); + auto k_cache = host_kv_caches_->at(0).get_k_cache(); + k_cache_size_per_block_ = k_cache.numel() * k_cache.element_size(); + LOG(INFO) << "key cache size per block: " << k_cache_size_per_block_; + + auto v_cache = host_kv_caches_->at(0).get_v_cache(); + if (v_cache.defined() && v_cache.numel() != 0) { + v_cache_size_per_block_ = v_cache.numel() * v_cache.element_size(); + LOG(INFO) << "value cache size per block: " << v_cache_size_per_block_; + } - LOG(INFO) << "k_cache_size_per_block: " << k_cache_size_per_block_; - LOG(INFO) << "v_cache_size_per_block: " << v_cache_size_per_block_; + auto index_cache = host_kv_caches_->at(0).get_index_cache(); + if (index_cache.defined() && index_cache.numel() != 0) { + index_cache_size_per_block_ = + index_cache.numel() * index_cache.element_size(); + LOG(INFO) << "index cache size per block: " << index_cache_size_per_block_; + } if (config_.protocol == "rdma") { if (config_.total_size > 0 && config_.tensor_data != nullptr) { @@ -102,15 +108,8 @@ uint32_t KVCacheStore::batch_put( } str_keys.emplace_back(str_key); - - void* k_cache = - host_kv_caches_->at(block_info.dst_block_id).get_k_cache().data_ptr(); - void* v_cache = - host_kv_caches_->at(block_info.dst_block_id).get_k_cache().data_ptr(); - - slices.emplace_back(std::vector{ - mooncake::Slice{k_cache, k_cache_size_per_block_}, - mooncake::Slice{v_cache, v_cache_size_per_block_}}); + slices.emplace_back( + std::move(genarate_mooncake_slice(block_info.dst_block_id))); } if (str_keys.size() == 0) { @@ -149,17 +148,8 @@ uint32_t KVCacheStore::batch_get( } str_keys.emplace_back(str_key); - - void* k_cache = - host_kv_caches_->at(block_info.dst_block_id).get_k_cache().data_ptr(); - void* v_cache = - host_kv_caches_->at(block_info.dst_block_id).get_k_cache().data_ptr(); - - slices.insert( - std::make_pair(str_key, - std::vector{ - mooncake::Slice{k_cache, k_cache_size_per_block_}, - mooncake::Slice{v_cache, v_cache_size_per_block_}})); + slices.insert(std::make_pair( + str_key, std::move(genarate_mooncake_slice(block_info.dst_block_id)))); } if (str_keys.size() == 0) { @@ -177,24 +167,6 @@ uint32_t KVCacheStore::batch_get( return success_cnt; } -uint32_t KVCacheStore::batch_remove( - Slice& block_transfer_info) { - CHECK(is_initialized_) << "KVCacheStore is not initialized."; - uint32_t success_cnt = 0; - for (auto block_info : block_transfer_info) { - std::string str_key(reinterpret_cast(block_info.hash_key), - MURMUR_HASH3_VALUE_LEN); - str_key.append(std::to_string(config_.tp_rank)); - - auto result = client_ptr_->Remove(str_key); - - if (result.has_value()) { - success_cnt++; - } - } - return success_cnt; -} - uint32_t KVCacheStore::batch_exist(std::vector&& keys) { if (!is_initialized_) { return 0; @@ -210,4 +182,26 @@ uint32_t KVCacheStore::batch_exist(std::vector&& keys) { return ret; } +std::vector KVCacheStore::genarate_mooncake_slice( + int32_t block_id) { + std::vector slice; + slice.reserve(3); + + void* k_cache = host_kv_caches_->at(block_id).get_k_cache().data_ptr(); + slice.emplace_back(mooncake::Slice{k_cache, k_cache_size_per_block_}); + + if (v_cache_size_per_block_ != 0) { + void* v_cache = host_kv_caches_->at(block_id).get_v_cache().data_ptr(); + slice.emplace_back(mooncake::Slice{v_cache, v_cache_size_per_block_}); + } + + if (index_cache_size_per_block_ != 0) { + void* index_cache = + host_kv_caches_->at(block_id).get_index_cache().data_ptr(); + slice.emplace_back( + mooncake::Slice{index_cache, index_cache_size_per_block_}); + } + return slice; +} + } // namespace xllm diff --git a/xllm/core/framework/kv_cache/kv_cache_store.h b/xllm/core/framework/kv_cache/kv_cache_store.h index 8cf7b9cf9..cae74ae21 100644 --- a/xllm/core/framework/kv_cache/kv_cache_store.h +++ b/xllm/core/framework/kv_cache/kv_cache_store.h @@ -49,8 +49,6 @@ class KVCacheStore { uint32_t batch_get(Slice& block_transfer_info); - uint32_t batch_remove(Slice& block_transfer_info); - uint32_t batch_exist(std::vector&& keys); static KVCacheStore& get_instance() { @@ -63,6 +61,8 @@ class KVCacheStore { KVCacheStore(const KVCacheStore&) = delete; KVCacheStore& operator=(const KVCacheStore&) = delete; + std::vector genarate_mooncake_slice(int32_t block_id); + private: bool is_initialized_ = false; @@ -71,8 +71,9 @@ class KVCacheStore { std::vector* host_kv_caches_; - uint64_t k_cache_size_per_block_; - uint64_t v_cache_size_per_block_; + uint64_t k_cache_size_per_block_ = 0; + uint64_t v_cache_size_per_block_ = 0; + uint64_t index_cache_size_per_block_ = 0; std::shared_ptr client_ptr_; }; diff --git a/xllm/core/framework/model/model_input_params.h b/xllm/core/framework/model/model_input_params.h index 4dd0f4122..cd2f1eb40 100644 --- a/xllm/core/framework/model/model_input_params.h +++ b/xllm/core/framework/model/model_input_params.h @@ -31,7 +31,8 @@ namespace xllm { enum class TransferType : uint8_t { G2H = 0, // global memory(KVCache store) to host memory(DRAM) H2D = 1, // host memory(DRAM) to device memory(HBM) - D2G = 2 // host memory(DRAM) to global memory(KVCache store) + D2G = 2, // host memory(DRAM) to global memory(KVCache store) + G2D = 3 // global memory(KVCache store) to device memory(HBM) }; struct BlockTransferInfo { @@ -172,6 +173,19 @@ struct ModelInputParams { #endif } + bool synchronize_layer(uint32_t layer_idx) const { +#if defined(USE_NPU) + if (layer_wise_load_synchronizer != nullptr && + layer_idx % layers_per_bacth_copy == 0) { + if (!layer_wise_load_synchronizer->synchronize_layer( + layer_idx / layers_per_bacth_copy)) { + return false; + } + } +#endif + return true; + } + // whether the kv-cache is empty for all sequences. bool empty_kv_cache = true; BatchForwardType batch_forward_type; @@ -227,6 +241,7 @@ struct ModelInputParams { #if defined(USE_NPU) std::shared_ptr layer_synchronizer = nullptr; + uint32_t layers_per_bacth_copy = std::numeric_limits::max(); std::shared_ptr layer_wise_load_synchronizer = nullptr; #endif diff --git a/xllm/core/framework/prefix_cache/prefix_cache.h b/xllm/core/framework/prefix_cache/prefix_cache.h index 0da22c5f8..875f6d65a 100644 --- a/xllm/core/framework/prefix_cache/prefix_cache.h +++ b/xllm/core/framework/prefix_cache/prefix_cache.h @@ -89,10 +89,7 @@ class PrefixCache { } } - virtual KvCacheEvent* get_upload_kvcache_events() { - LOG(ERROR) << "Not implemented!"; - return nullptr; - } + virtual KvCacheEvent* get_upload_kvcache_events() { return nullptr; } static uint32_t compute_hash_keys(const Slice& token_ids, std::vector& blocks); diff --git a/xllm/core/framework/request/sequence.cpp b/xllm/core/framework/request/sequence.cpp index ce2048157..81c0f4e7a 100644 --- a/xllm/core/framework/request/sequence.cpp +++ b/xllm/core/framework/request/sequence.cpp @@ -44,7 +44,8 @@ Sequence::Sequence(size_t index, mm_data_(mm_data), latest_generate_time_(absl::Now()), sequence_params_(seq_params), - decoder_(std::move(decoder)) { + decoder_(std::move(decoder)), + termination_flag_(std::make_shared>(false)) { CHECK(!prompt_token_ids.empty()) << "empty prompt token ids"; auto capacity = sequence_params_.seq_capacity; CHECK_GT(capacity, prompt_token_ids.size()) << "capacity too small"; @@ -93,7 +94,8 @@ Sequence::Sequence(const Sequence& other) dp_rank_(other.dp_rank_), cur_generated_token_idx_(other.cur_generated_token_idx_), first_token_(other.first_token_), - is_pre_scheduled_step_prefill_(other.is_pre_scheduled_step_prefill_) { + is_pre_scheduled_step_prefill_(other.is_pre_scheduled_step_prefill_), + termination_flag_(std::make_shared>(false)) { logprob_state_ = std::make_unique(*other.logprob_state_); } @@ -381,6 +383,8 @@ void Sequence::add_host_kv_blocks(const std::vector& blocks) { void Sequence::reset() { kv_state_.reset(); host_kv_state_.reset(); + timer_.reset(); + is_timeout_set_ = false; volatile_num_prompt_tokens_ = num_tokens_; } @@ -455,12 +459,24 @@ Slice Sequence::get_generated_tokens() const { return {tokens_.data(), 0}; } -void Sequence::update_prefetch_result() { +bool Sequence::update_prefetch_result(uint32_t timeout) { if (prefetch_results_.empty()) { - return; + return true; + } + + if (timeout != 0 && !termination_flag_->load(std::memory_order_acquire)) { + if (!is_timeout_set_) { + timer_.reset(); + is_timeout_set_ = true; + return false; + } + + if (timer_.elapsed_milliseconds() < timeout) { + return false; + } } - termination_flag_.store(true, std::memory_order_release); + termination_flag_->store(true, std::memory_order_release); uint32_t success_cnt = host_kv_state_.kv_blocks().size(); for (auto& cnt : prefetch_results_) { success_cnt = std::min(success_cnt, cnt->load()); @@ -470,6 +486,7 @@ void Sequence::update_prefetch_result() { success_cnt * host_kv_state_.kv_blocks()[0].size()); } prefetch_results_.clear(); + return true; } } // namespace xllm diff --git a/xllm/core/framework/request/sequence.h b/xllm/core/framework/request/sequence.h index 3fad73a1a..d2f8c0d41 100644 --- a/xllm/core/framework/request/sequence.h +++ b/xllm/core/framework/request/sequence.h @@ -34,6 +34,7 @@ limitations under the License. #include "sequence_kv_state.h" #include "sequence_logprob_state.h" #include "stopping_checker.h" +#include "util/timer.h" namespace xllm { @@ -242,12 +243,14 @@ class Sequence final { const Tokenizer& tokenizer, std::optional>& out_logprobs); - const std::atomic& get_termination_flag() { return termination_flag_; } + std::shared_ptr> get_termination_flag() { + return termination_flag_; + } std::vector>>* get_prefetch_results() { return &prefetch_results_; } - void update_prefetch_result(); + bool update_prefetch_result(uint32_t timeout); void reset(); @@ -359,8 +362,11 @@ class Sequence final { std::atomic cancelled_{false}; // kvcache store copy async result - std::atomic termination_flag_{false}; + std::shared_ptr> termination_flag_; std::vector>> prefetch_results_; + + Timer timer_; + bool is_timeout_set_ = false; }; } // namespace xllm diff --git a/xllm/core/framework/xtensor/xtensor_manager_pool.h b/xllm/core/framework/xtensor/xtensor_manager_pool.h index a00ce0fa0..1bba771da 100644 --- a/xllm/core/framework/xtensor/xtensor_manager_pool.h +++ b/xllm/core/framework/xtensor/xtensor_manager_pool.h @@ -43,10 +43,6 @@ class XTensorManagerPool final : public KVCacheManager { LOG(FATAL) << "cache is not implemented for page manager pool"; } - uint32_t pre_allocate(Sequence* sequence) override { - LOG(FATAL) << "pre_allocate is not implemented for page manager pool"; - } - std::vector allocate(size_t num_tokens, int32_t& dp_rank) override { LOG(FATAL) << "allocate is not implemented for page manager pool"; } @@ -55,31 +51,12 @@ class XTensorManagerPool final : public KVCacheManager { LOG(FATAL) << "allocate_shared is not implemented for page manager pool"; } - std::vector>* - get_offload_block_transfer_infos() override { - LOG(FATAL) - << "get_offload_block_transfer_infos is not implemented for page " - "manager pool"; - } - - std::vector>* get_load_block_transfer_infos() - override { - LOG(FATAL) << "get_load_block_transfer_infos is not implemented for page " - "manager pool"; - } - std::vector>* get_swap_block_transfer_infos() override { LOG(FATAL) << "get_swap_block_transfer_infos is not implemented for page " "manager pool"; } - void postprocess_offload( - std::vector>>& futures) override { - LOG(FATAL) << "postprocess_offload is not implemented for page " - "manager pool"; - } - void reset_transfer_infos() override { LOG(FATAL) << "reset_transfer_infos is not implemented for page manager pool"; diff --git a/xllm/core/kernels/npu/xllm_ops/top_k_top_p.h b/xllm/core/kernels/npu/xllm_ops/top_k_top_p.h index 3a9d1334f..0baadea6a 100644 --- a/xllm/core/kernels/npu/xllm_ops/top_k_top_p.h +++ b/xllm/core/kernels/npu/xllm_ops/top_k_top_p.h @@ -20,7 +20,7 @@ limitations under the License. #include #include "acl/acl.h" -#include "aclnn_apply_top_k_top_p.h" +#include "aclnnop/aclnn_apply_top_k_top_p.h" #include "acltensor_utils.h" #include "util/tensor_helper.h" diff --git a/xllm/core/platform/npu/npu_layer_synchronizer.h b/xllm/core/platform/npu/npu_layer_synchronizer.h index 6bf957b10..44f956430 100644 --- a/xllm/core/platform/npu/npu_layer_synchronizer.h +++ b/xllm/core/platform/npu/npu_layer_synchronizer.h @@ -31,6 +31,7 @@ class NPULayerSynchronizerImpl { aclrtEvent* get_event(const int64_t layer_index); std::atomic* get_event_flag(const int64_t layer_index); bool synchronize_layer(const int64_t layer_index); + uint32_t get_event_size() { return events_.size(); }; private: std::vector events_; diff --git a/xllm/core/runtime/engine.h b/xllm/core/runtime/engine.h index 54e671ee0..1f2a285ed 100644 --- a/xllm/core/runtime/engine.h +++ b/xllm/core/runtime/engine.h @@ -96,8 +96,8 @@ class Engine { virtual void prefetch_from_storage( const uint32_t dp_rank, - const std::atomic& flag, const std::vector& block_transfer_info, + std::shared_ptr> flag, std::vector>>* prefetch_results) { LOG(FATAL) << " prefetch_from_storage is not implemented!"; }; diff --git a/xllm/core/runtime/llm_engine.cpp b/xllm/core/runtime/llm_engine.cpp index c8077389c..76b523e9c 100644 --- a/xllm/core/runtime/llm_engine.cpp +++ b/xllm/core/runtime/llm_engine.cpp @@ -30,6 +30,7 @@ limitations under the License. #include "common/global_flags.h" #include "common/interruption_bus.h" #include "common/metrics.h" +#include "framework/block/hierarchy_block_manager_pool.h" #include "framework/model/model_args.h" #include "framework/model_loader.h" #include "framework/xtensor/multi_layer_xtensor_transfer.h" @@ -346,7 +347,12 @@ bool LLMEngine::allocate_kv_cache(const Engine::KVCacheCapacity& kv_cache_cap) { .enable_disagg_pd(options_.enable_disagg_pd()) .enable_cache_upload(options_.enable_cache_upload()) .enable_kvcache_store(options_.enable_kvcache_store()); - kv_cache_manager_ = std::make_unique(options, dp_size_); + if (options_.host_blocks_factor() > 1.0 || options_.enable_kvcache_store()) { + kv_cache_manager_ = + std::make_unique(options, this, dp_size_); + } else { + kv_cache_manager_ = std::make_unique(options, dp_size_); + } // init kv cache for each worker in parallel std::vector> futures; @@ -511,15 +517,15 @@ void LLMEngine::transfer_kv_blocks( void LLMEngine::prefetch_from_storage( const uint32_t dp_rank, - const std::atomic& flag, const std::vector& block_transfer_info, + std::shared_ptr> flag, std::vector>>* prefetch_results) { - prefetch_results->resize(dp_local_tp_size_, - std::make_shared>(0)); + prefetch_results->reserve(dp_local_tp_size_); for (auto tp_rank = 0; tp_rank < dp_local_tp_size_; ++tp_rank) { + prefetch_results->emplace_back(std::make_shared>(0)); worker_clients_[tp_rank + dp_local_tp_size_ * dp_rank] ->prefetch_from_storage( - flag, block_transfer_info, prefetch_results->at(tp_rank)); + block_transfer_info, flag, prefetch_results->at(tp_rank)); } } diff --git a/xllm/core/runtime/llm_engine.h b/xllm/core/runtime/llm_engine.h index 10681d01f..666954de9 100644 --- a/xllm/core/runtime/llm_engine.h +++ b/xllm/core/runtime/llm_engine.h @@ -82,8 +82,8 @@ class LLMEngine : public Engine { void prefetch_from_storage( const uint32_t dp_rank, - const std::atomic& flag, const std::vector& block_transfer_info, + std::shared_ptr> flag, std::vector>>* prefetch_results) override; diff --git a/xllm/core/runtime/llm_master.cpp b/xllm/core/runtime/llm_master.cpp index ca78d7e1b..fdb4e7bfd 100644 --- a/xllm/core/runtime/llm_master.cpp +++ b/xllm/core/runtime/llm_master.cpp @@ -96,7 +96,8 @@ LLMMaster::LLMMaster(const Options& options) .enable_forward_interruption(options_.enable_forward_interruption()) .max_global_ttft_ms(options_.max_global_ttft_ms()) .max_global_tpot_ms(options_.max_global_tpot_ms()) - .server_idx(options_.server_idx()); + .server_idx(options_.server_idx()) + .prefetch_timeout(options_.prefetch_timeout()); scheduler_ = create_continuous_scheduler(engine_.get(), scheduler_options); if (options_.enable_service_routing()) { diff --git a/xllm/core/runtime/master.cpp b/xllm/core/runtime/master.cpp index d33934282..34e791f8c 100644 --- a/xllm/core/runtime/master.cpp +++ b/xllm/core/runtime/master.cpp @@ -218,6 +218,8 @@ Master::Master(const Options& options, EngineType type) : options_(options) { .store_master_server_address(options_.store_master_server_address()) .store_metadata_server(options_.store_metadata_server()) .store_local_hostname(options_.store_local_hostname()) + .prefetch_bacth_size(options_.prefetch_bacth_size()) + .layers_wise_copy_batchs(options_.layers_wise_copy_batchs()) .enable_continuous_kvcache(options_.enable_continuous_kvcache()) .enable_offline_inference(options_.enable_offline_inference()) .spawn_worker_path(options_.spawn_worker_path()) diff --git a/xllm/core/runtime/options.h b/xllm/core/runtime/options.h index e2e68baf9..41ddf4b1e 100644 --- a/xllm/core/runtime/options.h +++ b/xllm/core/runtime/options.h @@ -158,6 +158,12 @@ struct Options { // value used if port is not included) PROPERTY(std::string, store_local_hostname) = ""; + // Prefetch from kvcache store copy batch size + PROPERTY(uint32_t, prefetch_bacth_size) = 2; + + // Layer wise H2D copy batchs + PROPERTY(uint32_t, layers_wise_copy_batchs) = 4; + // dit // max requests per batch PROPERTY(int, max_requests_per_batch) = 0; diff --git a/xllm/core/runtime/worker.cpp b/xllm/core/runtime/worker.cpp index 4ec33fbcf..d4d0124b1 100644 --- a/xllm/core/runtime/worker.cpp +++ b/xllm/core/runtime/worker.cpp @@ -166,19 +166,15 @@ folly::SemiFuture Worker::pull_kv_blocks_async( } uint32_t Worker::transfer_kv_blocks( - const std::vector& block_transfer_info) { - return impl_->transfer_kv_blocks(block_transfer_info); -} - -void Worker::transfer_kv_blocks( const uint64_t batch_id, const std::vector& block_transfer_info) { - impl_->transfer_kv_blocks(batch_id, std::move(block_transfer_info)); + return impl_->transfer_kv_blocks(batch_id, std::move(block_transfer_info)); } -uint32_t Worker::prefetch_from_storage( +uint32_t Worker::transfer_kv_blocks( + const uint64_t batch_id, Slice& block_transfer_info) { - return impl_->prefetch_from_storage(block_transfer_info); + return impl_->transfer_kv_blocks(batch_id, block_transfer_info); } const torch::Device& Worker::device() const { return impl_->device(); } diff --git a/xllm/core/runtime/worker.h b/xllm/core/runtime/worker.h index e7f711950..bfb7f496a 100644 --- a/xllm/core/runtime/worker.h +++ b/xllm/core/runtime/worker.h @@ -107,13 +107,11 @@ class Worker { const std::vector& dst_blocks); virtual uint32_t transfer_kv_blocks( - const std::vector& block_transfer_info); - - virtual void transfer_kv_blocks( const uint64_t batch_id, const std::vector& block_transfer_info); - virtual uint32_t prefetch_from_storage( + virtual uint32_t transfer_kv_blocks( + const uint64_t batch_id, Slice& block_transfer_info); // Run the model on the given input. async call diff --git a/xllm/core/runtime/worker_client.cpp b/xllm/core/runtime/worker_client.cpp index 8a115fac2..b047ed75b 100644 --- a/xllm/core/runtime/worker_client.cpp +++ b/xllm/core/runtime/worker_client.cpp @@ -166,16 +166,16 @@ folly::SemiFuture WorkerClient::transfer_kv_blocks( } void WorkerClient::prefetch_from_storage( - const std::atomic& flag, const std::vector& block_transfer_info, - std::shared_ptr>& success_cnt) { + std::shared_ptr> flag, + std::shared_ptr> success_cnt) { LOG(FATAL) << "WorkerClient Method prefetch_from_storage is UnImplemented."; } void WorkerClient::transfer_kv_blocks( const uint64_t batch_id, const std::vector& block_transfer_info) { - worker_->transfer_kv_blocks(batch_id, block_transfer_info); + LOG(FATAL) << "WorkerClient Method transfer_kv_blocks is UnImplemented."; } const torch::Device& WorkerClient::device() const { return worker_->device(); } diff --git a/xllm/core/runtime/worker_client.h b/xllm/core/runtime/worker_client.h index d391a3630..842bb46ee 100644 --- a/xllm/core/runtime/worker_client.h +++ b/xllm/core/runtime/worker_client.h @@ -117,9 +117,9 @@ class WorkerClient { const std::vector& block_transfer_info); virtual void prefetch_from_storage( - const std::atomic& flag, const std::vector& block_transfer_info, - std::shared_ptr>& success_cnt); + std::shared_ptr> flag, + std::shared_ptr> success_cnt); // Run the model on the given input. async call // the future returns a successfull status with no meaningful value diff --git a/xllm/core/runtime/worker_impl.cpp b/xllm/core/runtime/worker_impl.cpp index 2ca022086..07bfca726 100644 --- a/xllm/core/runtime/worker_impl.cpp +++ b/xllm/core/runtime/worker_impl.cpp @@ -75,14 +75,6 @@ WorkerImpl::WorkerImpl(const ParallelArgs& parallel_args, device_.set_device(); device_.init_device_context(); threadpool_.schedule([this]() mutable { device_.set_device(); }); - h2d_threadpool_ = std::make_unique( - 2, [this]() mutable { device_.set_device(); }); - d2h_threadpool_ = std::make_unique( - 5, [this]() mutable { device_.set_device(); }); - for (int i = 0; i < h2d_threadpool_->size() + d2h_threadpool_->size(); i++) { - copy_stream_.enqueue(device_.get_stream_from_pool(TIMEOUT_MS)); - } - prepare_stream_ = device_.get_stream_from_pool(); sampler_ = std::make_unique(); } @@ -123,81 +115,11 @@ bool WorkerImpl::allocate_kv_cache( kv_caches_.emplace_back(key_cache, value_cache, index_cache); } - key_cache_size_per_layer_ = kv_caches_[0].get_k_cache()[0].numel() * - kv_caches_[0].get_k_cache()[0].element_size(); - // make sure value cache is not empty - if (!kv_cache_shape[1].empty()) { - value_cache_size_per_layer_ = kv_caches_[0].get_v_cache()[0].numel() * - kv_caches_[0].get_v_cache()[0].element_size(); - } - - allocate_host_kv_cache(kv_cache_shape); + init_hierarchy_kv_cache_transfer(); status_ = Status::READY; return true; } -bool WorkerImpl::allocate_host_kv_cache( - const std::vector>& device_kv_cache_shape) { - if (options_.host_blocks_factor() <= 0.00001) { - return true; - } -#if defined(USE_NPU) - CHECK(model_ != nullptr) << "Model is not initialized."; - CHECK(host_kv_caches_.empty()) << "KV caches are already initialized."; - CHECK(device_kv_cache_shape[0][0] == device_kv_cache_shape[1][0]); - - std::vector> host_kv_cache_shape = device_kv_cache_shape; - const int64_t num_layers = context_.get_model_args().n_layers(); - int64_t host_bolck_size = - device_kv_cache_shape[0][0] * options_.host_blocks_factor(); - host_kv_cache_shape[0][0] = num_layers; - CHECK(!host_kv_cache_shape[1].empty()) - << "v cache shape should not be empty!"; - // TODO(kangmeng3): support mlu kvcache - host_kv_cache_shape[1][0] = num_layers; - - // create a KVCache shape: block_size * [layers, token, head, dim] - aligned_tensor_creater_ = std::make_unique( - host_kv_cache_shape, dtype_, host_bolck_size, &host_kv_caches_); - - LOG(INFO) << "Initializing host kv block size: " << host_bolck_size; - - int32_t device_id = device_.index(); - h2d_attrs_.dstLoc.id = device_id; - h2d_attrs_.dstLoc.type = aclrtMemLocationType::ACL_MEM_LOCATION_TYPE_DEVICE; - h2d_attrs_.srcLoc.id = device_id; - h2d_attrs_.srcLoc.type = aclrtMemLocationType::ACL_MEM_LOCATION_TYPE_HOST; - memset(h2d_attrs_.rsv, 0, 16); - - d2h_attrs_.dstLoc.id = device_id; - d2h_attrs_.dstLoc.type = aclrtMemLocationType::ACL_MEM_LOCATION_TYPE_HOST; - d2h_attrs_.srcLoc.id = device_id; - d2h_attrs_.srcLoc.type = aclrtMemLocationType::ACL_MEM_LOCATION_TYPE_DEVICE; - memset(d2h_attrs_.rsv, 0, 16); - - if (options_.enable_kvcache_store()) { - StoreConfig config; - config.localhost_name = options_.store_local_hostname(); - config.protocol = options_.store_protocol(); - config.metadata_server = options_.store_metadata_server(); - config.master_server_address = options_.store_master_server_address(); - config.tp_rank = options_.dp_size() > 1 - ? options_.node_rank() % options_.dp_size() - : options_.node_rank(); - config.total_size = aligned_tensor_creater_->get_total_size(); - config.tensor_data = aligned_tensor_creater_->get_base_ptr(); - - if (!KVCacheStore::get_instance().init(config, &host_kv_caches_)) { - LOG(ERROR) << "Init KVCacheStore fail!"; - return false; - } - } - - status_ = Status::READY; -#endif - return true; -} - bool WorkerImpl::allocate_continuous_kv_cache( const std::vector& options) { CHECK(model_ != nullptr) << "Model is not initialized."; @@ -261,7 +183,8 @@ bool WorkerImpl::allocate_kv_cache_with_transfer( } #endif - allocate_host_kv_cache(kv_cache_shape); + init_hierarchy_kv_cache_transfer(); + status_ = Status::READY; return true; } @@ -286,7 +209,7 @@ bool WorkerImpl::allocate_kv_cache_with_transfer( kv_caches_, num_layers, kv_cache_shape, dtype_); } - allocate_host_kv_cache(kv_cache_shape); + init_hierarchy_kv_cache_transfer(); status_ = Status::READY; return true; } @@ -491,18 +414,10 @@ folly::SemiFuture> WorkerImpl::step_async( threadpool_.schedule([this, input = std::move(input_on_device), promise = std::move(promise)]() mutable { -#if defined(USE_NPU) - { - std::lock_guard lock(mutex_); - if (layer_wise_load_synchronizer_.count(input.input_params.batch_id) != - 0) { - input.input_params.layer_wise_load_synchronizer = std::move( - layer_wise_load_synchronizer_[input.input_params.batch_id]); - layer_wise_load_synchronizer_.erase(input.input_params.batch_id); - } + if (hierarchy_kv_cache_transfer_ != nullptr) { + hierarchy_kv_cache_transfer_->set_layer_synchronizer(input.input_params); } -#endif // run the model on the given input in working thread if (!enable_schedule_overlap()) { const auto output = this->step(input); @@ -702,39 +617,17 @@ folly::SemiFuture WorkerImpl::pull_kv_blocks_async( } uint32_t WorkerImpl::transfer_kv_blocks( + const uint64_t batch_id, const std::vector& block_transfer_info) { - CHECK(!block_transfer_info.empty()); - - switch (block_transfer_info[0].transfer_type) { - case TransferType::D2G: - return offload_kv_blocks(block_transfer_info); - default: - LOG(ERROR) << "Unsupport copy type: " - << uint32_t(block_transfer_info[0].transfer_type); - return 0; - } + return hierarchy_kv_cache_transfer_->transfer_kv_blocks( + batch_id, std::move(block_transfer_info)); } -void WorkerImpl::transfer_kv_blocks( +uint32_t WorkerImpl::transfer_kv_blocks( const uint64_t batch_id, - const std::vector& block_transfer_info) { - CHECK(!block_transfer_info.empty()); - h2d_threadpool_->schedule( - [this, - batch_id = batch_id, - block_transfer_info = std::move(block_transfer_info)]() mutable { - switch (block_transfer_info[0].transfer_type) { - case TransferType::H2D: { - Slice info_slice{block_transfer_info}; - h2d_batch_copy(batch_id, info_slice); - break; - } - default: - LOG(ERROR) << "Unsupport copy type: " - << uint32_t(block_transfer_info[0].transfer_type); - break; - } - }); + Slice& block_transfer_info) { + return hierarchy_kv_cache_transfer_->transfer_kv_blocks(batch_id, + block_transfer_info); } folly::SemiFuture WorkerImpl::allocate_kv_cache_with_transfer_async( @@ -759,308 +652,24 @@ int64_t WorkerImpl::get_active_activation_memory() { .active_activation_memory; } -// TODO(kangmeng): abstract this code(and the code below) into a new class here. -uint32_t WorkerImpl::offload_kv_blocks( - const std::vector& block_transfer_info) { - if (block_transfer_info.empty()) { - return 0; +void WorkerImpl::init_hierarchy_kv_cache_transfer() { + if (options_.host_blocks_factor() > 1 || options_.enable_kvcache_store()) { + HierarchyKVCacheTransfer::Options transfer_options; + transfer_options + .tp_rank(options_.dp_size() > 1 + ? options_.node_rank() % options_.dp_size() + : options_.node_rank()) + .layers(context_.get_model_args().n_layers()) + .host_blocks_factor(options_.host_blocks_factor()) + .layers_wise_copy_batchs(options_.layers_wise_copy_batchs()) + .enable_kvcache_store(options_.enable_kvcache_store()) + .store_protocol(options_.store_protocol()) + .store_master_server_address(options_.store_master_server_address()) + .store_metadata_server(options_.store_metadata_server()) + .store_local_hostname(options_.store_local_hostname()); + hierarchy_kv_cache_transfer_ = std::make_unique( + transfer_options, device_, &kv_caches_); } - - const int64_t num_layers = context_.get_model_args().n_layers(); - uint32_t max_blocks_per_batch = BATCH_COPY_MAX_SIZE / (2 * num_layers); - uint32_t total_slice = - block_transfer_info.size() / max_blocks_per_batch + - uint32_t(block_transfer_info.size() % max_blocks_per_batch != 0); - - Slice transfer_info_slice(block_transfer_info); - std::vector> futures; - futures.reserve(total_slice); - - for (size_t i = 0; i < block_transfer_info.size(); - i += max_blocks_per_batch) { - folly::Promise promise; - auto future = promise.getSemiFuture(); - auto slice = transfer_info_slice.slice( - i, std::min(i + max_blocks_per_batch, block_transfer_info.size())); - - d2h_threadpool_->schedule([this, - promise = std::move(promise), - slice = std::move(slice)]() mutable { - bool ret = d2h_batch_copy(slice); - auto success_cnt = offload_to_store(slice); - if (success_cnt != slice.size()) { - LOG(WARNING) << "KVCacheStore not all put success: " << success_cnt - << "/" << slice.size(); - } - promise.setValue(ret); - }); - - futures.emplace_back(std::move(future)); - } - - if (!futures.empty()) { - try { - // TODO(kangmeng): add timeout - auto all_results = folly::collect(futures).get(); - if (!std::all_of(all_results.begin(), all_results.end(), [](bool result) { - return result; - })) { - LOG(FATAL) << "Not all D2H copy returned true"; - } - } catch (const std::exception& e) { - LOG(FATAL) << "Future execution failed: " << e.what(); - } - } - - return block_transfer_info.size(); } -bool WorkerImpl::d2h_batch_copy(Slice& block_transfer_info) { -#if defined(USE_NPU) - const int64_t num_layers = context_.get_model_args().n_layers(); - uint32_t num_batches = block_transfer_info.size() * num_layers * 2; - void** srcs = new void*[num_batches]; - void** dsts = new void*[num_batches]; - size_t* copy_size = new size_t[num_batches]; - aclrtMemcpyBatchAttr attrs[1] = {d2h_attrs_}; - size_t attrs_indexes[1] = {0}; - size_t fail_index; - uint32_t curr_index = 0; - - for (const auto& info : block_transfer_info) { - auto dst_k_cache = host_kv_caches_.at(info.dst_block_id).get_k_cache(); - auto dst_v_cache = host_kv_caches_.at(info.dst_block_id).get_v_cache(); - - for (int layer_id = 0; layer_id < num_layers; layer_id++) { - auto src_k_cache = kv_caches_.at(layer_id).get_k_cache(); - auto src_v_cache = kv_caches_.at(layer_id).get_v_cache(); - - srcs[curr_index] = src_k_cache[info.src_block_id].data_ptr(); - dsts[curr_index] = dst_k_cache[layer_id].data_ptr(); - copy_size[curr_index] = key_cache_size_per_layer_; - - curr_index++; - - srcs[curr_index] = src_v_cache[info.src_block_id].data_ptr(); - dsts[curr_index] = dst_v_cache[layer_id].data_ptr(); - copy_size[curr_index] = value_cache_size_per_layer_; - - curr_index++; - } - } - - std::unique_ptr stream; - copy_stream_.wait_dequeue(stream); - c10::StreamGuard streamGuard = stream->set_stream_guard(); - - // TODO(kangmeng): change to async API - aclError ret = aclrtMemcpyBatch(dsts, - copy_size, - srcs, - copy_size, - num_batches, - attrs, - attrs_indexes, - 1, - &fail_index); - if (ret != 0 || fail_index != SIZE_MAX) { - LOG(ERROR) << "aclrtMemcpyBatch error: " << ret - << ", fail_index:" << fail_index; - copy_stream_.enqueue(std::move(stream)); - return false; - } - - if (stream->synchronize() != 0) { - LOG(ERROR) << "d2h_batch_copy timeout!"; - copy_stream_.enqueue(std::move(stream)); - return false; - } - - copy_stream_.enqueue(std::move(stream)); - - delete[] dsts; - delete[] srcs; - delete[] copy_size; - -#endif - return true; -} - -bool WorkerImpl::h2d_batch_copy(const uint64_t batch_id, - Slice& block_transfer_info) { -#if defined(USE_NPU) - CHECK(block_transfer_info.size() < BATCH_COPY_MAX_SIZE / 2) - << "h2d_batch_copy support copy blocks less than " - << BATCH_COPY_MAX_SIZE / 2 << ", but got " << block_transfer_info.size(); - - if (block_transfer_info.empty()) { - return true; - } - - const int64_t num_layers = context_.get_model_args().n_layers(); - uint32_t num_batches = block_transfer_info.size() * 2; - - auto synchronizer = std::make_shared(num_layers); - { - std::lock_guard lock(mutex_); - if (layer_wise_load_synchronizer_.count(batch_id) != 0) { - LOG(FATAL) << "Batch id already exists!"; - } - layer_wise_load_synchronizer_[batch_id] = synchronizer; - } - - void** srcs = new void*[num_batches]; - void** dsts = new void*[num_batches]; - size_t* copy_size = new size_t[num_batches]; - aclrtMemcpyBatchAttr attrs[1] = {h2d_attrs_}; - size_t attrs_indexes[1] = {0}; - - std::unique_ptr stream; - copy_stream_.wait_dequeue(stream); - c10::StreamGuard streamGuard = stream->set_stream_guard(); - - aclError ret = 0; - - for (int layer_id = 0; layer_id < num_layers; layer_id++) { - auto dst_k_cache = kv_caches_.at(layer_id).get_k_cache(); - auto dst_v_cache = kv_caches_.at(layer_id).get_v_cache(); - size_t fail_index = 0; - uint32_t curr_index = 0; - auto* event = synchronizer->get_event(layer_id); - auto* event_flag = synchronizer->get_event_flag(layer_id); - - for (const auto& info : block_transfer_info) { - auto src_k_cache = host_kv_caches_.at(info.src_block_id).get_k_cache(); - auto src_v_cache = host_kv_caches_.at(info.src_block_id).get_v_cache(); - - srcs[curr_index] = src_k_cache[layer_id].data_ptr(); - dsts[curr_index] = dst_k_cache[info.dst_block_id].data_ptr(); - copy_size[curr_index] = key_cache_size_per_layer_; - curr_index++; - - srcs[curr_index] = src_v_cache[layer_id].data_ptr(); - dsts[curr_index] = dst_v_cache[info.dst_block_id].data_ptr(); - copy_size[curr_index] = value_cache_size_per_layer_; - curr_index++; - } - - // TODO(kangmeng): change to async API - ret = aclrtMemcpyBatch(dsts, - copy_size, - srcs, - copy_size, - num_batches, - attrs, - attrs_indexes, - 1, - &fail_index); - - if (ret != 0 || fail_index != SIZE_MAX) { - LOG(ERROR) << "aclrtMemcpyBatch error: " << ret - << ", fail_index:" << fail_index; - } else { - ret = aclrtRecordEvent(*event, stream->get_stream()->stream()); - if (ret != 0) { - LOG(ERROR) << "aclrtRecordEvent error: " << ret; - } - } - event_flag->store(true, std::memory_order_release); - if (ret != 0) break; - } - - if (stream->synchronize() != 0) { - LOG(ERROR) << "h2d_batch_copy timeout!"; - copy_stream_.enqueue(std::move(stream)); - return false; - } - copy_stream_.enqueue(std::move(stream)); - - delete[] dsts; - delete[] srcs; - delete[] copy_size; - -#endif - return true; -} - -uint32_t WorkerImpl::offload_to_store( - Slice& block_transfer_info) { - if (!options_.enable_kvcache_store()) { - return block_transfer_info.size(); - } - - return KVCacheStore::get_instance().batch_put(block_transfer_info); -} - -uint32_t WorkerImpl::prefetch_from_storage( - Slice& block_transfer_info) { - if (!options_.enable_kvcache_store()) { - return 0; - } - return KVCacheStore::get_instance().batch_get(block_transfer_info); -} - -AlignedTensorCreater::AlignedTensorCreater( - const std::vector>& tensor_shapes, - const torch::ScalarType dtype, - const uint32_t num_tensors, - std::vector* tensors) { - CHECK(tensor_shapes.size() == 2) - << "tensor_shapes.size() must equal to 2, but got " - << tensor_shapes.size(); - - int64_t elements_per_k_tensor = 1; - int64_t elements_per_v_tensor = 1; - - for (auto dim : tensor_shapes[0]) { - elements_per_k_tensor *= dim; - } - for (auto dim : tensor_shapes[1]) { - elements_per_v_tensor *= dim; - } - - size_t element_size = torch::elementSize(dtype); - size_t bytes_per_k_tensor = elements_per_k_tensor * element_size; - size_t bytes_per_v_tensor = elements_per_v_tensor * element_size; - size_t page_size = sysconf(_SC_PAGESIZE); - total_size_ = num_tensors * (bytes_per_k_tensor + bytes_per_v_tensor); - total_size_ = ((total_size_ + page_size - 1) / page_size) * page_size; - - base_ptr_ = mmap(nullptr, - total_size_, - PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANONYMOUS, - -1, - 0); - - if (base_ptr_ == MAP_FAILED) { - LOG(FATAL) << "Failed to allocate aligned memory pool!"; - } - - if (mlock(base_ptr_, total_size_) != 0) { - munmap(base_ptr_, total_size_); - LOG(FATAL) << "Failed to lock memory pool!"; - } - - size_t current_offset = 0; - auto options = torch::TensorOptions().dtype(dtype).device(torch::kCPU); - tensors->reserve(num_tensors); - - for (size_t i = 0; i < num_tensors; ++i) { - void* k_tensor_ptr = static_cast(base_ptr_) + current_offset; - torch::Tensor k_tensor = - torch::from_blob(k_tensor_ptr, tensor_shapes[0], options); - current_offset += bytes_per_k_tensor; - - void* v_tensor_ptr = static_cast(base_ptr_) + current_offset; - torch::Tensor v_tensor = - torch::from_blob(v_tensor_ptr, tensor_shapes[1], options); - current_offset += bytes_per_v_tensor; - - tensors->emplace_back(k_tensor, v_tensor); - } - - LOG(INFO) << "Page aligned: " - << ((uintptr_t)base_ptr_ % page_size == 0 ? "YES" : "NO"); -} } // namespace xllm diff --git a/xllm/core/runtime/worker_impl.h b/xllm/core/runtime/worker_impl.h index 105abfd6c..5e2f2cf86 100644 --- a/xllm/core/runtime/worker_impl.h +++ b/xllm/core/runtime/worker_impl.h @@ -30,6 +30,7 @@ limitations under the License. #include "framework/kv_cache/llm_data_dist_transfer.h" #endif #include "framework/eplb/eplb_executor.h" +#include "framework/kv_cache/hierarchy_kv_cache_transfer.h" #include "framework/kv_cache/kv_cache_store.h" #include "framework/model/causal_lm.h" #include "framework/model/embedding_lm.h" @@ -46,8 +47,6 @@ limitations under the License. namespace xllm { -class AlignedTensorCreater; - class WorkerImpl { public: enum Status : int8_t { @@ -76,9 +75,6 @@ class WorkerImpl { virtual bool allocate_kv_cache( const std::vector>& kv_cache_shape); - virtual bool allocate_host_kv_cache( - const std::vector>& kv_cache_shape); - virtual bool allocate_kv_cache_with_transfer( uint64_t kv_cache_size, const std::vector>& kv_cache_shape); @@ -150,11 +146,12 @@ class WorkerImpl { const std::vector& dst_blocks); virtual uint32_t transfer_kv_blocks( + const uint64_t batch_id, const std::vector& block_transfer_info); - virtual void transfer_kv_blocks( + virtual uint32_t transfer_kv_blocks( const uint64_t batch_id, - const std::vector& block_transfer_info); + Slice& block_transfer_info); // Run the model on the given input. async call // the future returns a successfull status with no meaningful value @@ -183,20 +180,10 @@ class WorkerImpl { Status get_status() const { return status_; } - virtual uint32_t prefetch_from_storage( - Slice& block_transfer_info); - private: void update_last_step_output(const std::optional& output); - uint32_t offload_kv_blocks( - const std::vector& block_transfer_info); - - bool d2h_batch_copy(Slice& block_transfer_info); - bool h2d_batch_copy(const uint64_t batch_id, - Slice& block_transfer_info); - - uint32_t offload_to_store(Slice& block_transfer_info); + void init_hierarchy_kv_cache_transfer(); protected: // runtime options @@ -212,12 +199,6 @@ class WorkerImpl { // the task queue, step need to be executed one-by-one ThreadPool threadpool_; - // working thread for data copy - std::unique_ptr h2d_threadpool_; - std::unique_ptr d2h_threadpool_; - // copy streams only can be used in h2d_threadpool_ and d2h_threadpool_ - moodycamel::BlockingConcurrentQueue> copy_stream_; - // dtype of the model torch::ScalarType dtype_; @@ -234,8 +215,6 @@ class WorkerImpl { // kv caches std::vector kv_caches_; - std::vector host_kv_caches_; - std::unique_ptr aligned_tensor_creater_; // causal LM model std::unique_ptr model_; @@ -258,16 +237,9 @@ class WorkerImpl { #if defined(USE_NPU) std::shared_ptr kv_cache_transfer_; - aclrtMemcpyBatchAttr h2d_attrs_; - aclrtMemcpyBatchAttr d2h_attrs_; - - mutable std::mutex mutex_; - std::unordered_map> - layer_wise_load_synchronizer_; #endif - uint64_t key_cache_size_per_layer_; - uint64_t value_cache_size_per_layer_; + std::unique_ptr hierarchy_kv_cache_transfer_; bool is_spec_draft_ = false; @@ -276,26 +248,4 @@ class WorkerImpl { torch::Tensor expert_load_data_; }; -class AlignedTensorCreater { - private: - void* base_ptr_; - size_t total_size_; - - public: - AlignedTensorCreater(const std::vector>& tensor_shapes, - const torch::ScalarType dtype, - const uint32_t num_tensors, - std::vector* tensors); - - ~AlignedTensorCreater() { - if (base_ptr_ != nullptr) { - munlock(base_ptr_, total_size_); - munmap(base_ptr_, total_size_); - } - } - - void* get_base_ptr() const { return base_ptr_; } - size_t get_total_size() const { return total_size_; } -}; - } // namespace xllm diff --git a/xllm/core/runtime/xservice_client.cpp b/xllm/core/runtime/xservice_client.cpp index 4d9fbe2be..586d7694c 100644 --- a/xllm/core/runtime/xservice_client.cpp +++ b/xllm/core/runtime/xservice_client.cpp @@ -330,15 +330,6 @@ void XServiceClient::heartbeat() { sizeof(hash_key.data)); } } - - if (event.offload_cache.size()) { - cache_event->mutable_offload_cache()->Reserve( - event.offload_cache.size()); - for (auto& hash_key : event.offload_cache) { - cache_event->add_offload_cache(hash_key.data, - sizeof(hash_key.data)); - } - } } req.mutable_load_metrics()->set_gpu_cache_usage_perc( diff --git a/xllm/core/scheduler/continuous_scheduler.cpp b/xllm/core/scheduler/continuous_scheduler.cpp index baf193d4a..4caea9c29 100644 --- a/xllm/core/scheduler/continuous_scheduler.cpp +++ b/xllm/core/scheduler/continuous_scheduler.cpp @@ -101,7 +101,7 @@ bool ContinuousScheduler::add_request(std::shared_ptr& request) { CHECK(request != nullptr); CHECK(!request->sequences().empty()); - prefetch_from_storage(request); + kv_cache_manager_->prefetch_from_storage(request); if (request_queue_.write(request)) { return true; @@ -212,6 +212,13 @@ void ContinuousScheduler::handle_prefill_requests( << "Waiting request should have only one sequence."; } + if (!kv_cache_manager_->update_prefetch_result( + request, options_.prefetch_timeout())) { + waiting_priority_queue.pop(); + waiting_priority_queue.push(request); + continue; + } + // TODO: FIXME later // Optimization of the scheduling algorithm under multiple sequences // TODO: can refactor like handle_decode otherwise request with multiple @@ -229,7 +236,6 @@ void ContinuousScheduler::handle_prefill_requests( continue; } - prefill_sequence->update_prefetch_result(); // FIXME: use actual num_tokens to handle // Currently overestimating the number of tokens actually processed when // enable prefix cache @@ -810,45 +816,9 @@ std::vector ContinuousScheduler::prepare_batch() { if (!is_batches_empty) { // only update the scheduling latency when there are requests to process COUNTER_ADD(scheduling_latency_seconds, timer.elapsed_seconds()); - - // TODO(kangmeng): abstract this code(and the code below) into a new class - // here. - if (kv_cache_manager_->allow_host_block_extend()) { - auto* load_block_transfer_infos = - kv_cache_manager_->get_load_block_transfer_infos(); - - for (int i = 0; i < batches.size(); i++) { - if (!load_block_transfer_infos->at(i).empty()) { - batches[i].set_batch_id(); - engine_->transfer_kv_blocks( - i, - batches[i].batch_id(), - std::move(load_block_transfer_infos->at(i))); - } - } - } - } - - if (kv_cache_manager_->allow_host_block_extend()) { - auto* offload_block_transfer_infos = - kv_cache_manager_->get_offload_block_transfer_infos(); - - bool is_all_dp_copy_info_empty = true; - std::vector>> futures; - futures.resize(offload_block_transfer_infos->size()); - - for (int i = 0; i < futures.size(); i++) { - if (!offload_block_transfer_infos->at(i).empty()) { - futures[i] = std::move(engine_->transfer_kv_blocks( - i, std::move(offload_block_transfer_infos->at(i)))); - - is_all_dp_copy_info_empty = false; - } - } - - if (!is_all_dp_copy_info_empty) { - kv_cache_manager_->postprocess_offload(futures); - } + kv_cache_manager_->transfer_blocks(batches); + } else { + kv_cache_manager_->transfer_blocks(std::nullopt); } GAUGE_SET(num_pending_requests, @@ -914,39 +884,6 @@ std::vector ContinuousScheduler::schedule_request( return batch; } -void ContinuousScheduler::prefetch_from_storage( - std::shared_ptr& request) { - if (request->sequences()[0]->kv_state().num_kv_blocks() != 0 || - request->sequences()[0]->host_kv_state().num_kv_blocks() != 0) { - LOG(ERROR) - << "prefetch_from_storage can only be called before prepare batch!"; - return; - } - for (auto& prefill_sequence : request->sequences()) { - const size_t num_additional_blocks = - kv_cache_manager_->pre_allocate(prefill_sequence.get()); - if (num_additional_blocks > 0) { - const auto host_blocks = prefill_sequence->host_kv_state().kv_blocks(); - std::vector block_transfer_infos; - block_transfer_infos.reserve(num_additional_blocks); - for (int i = host_blocks.size() - num_additional_blocks; - i < host_blocks.size(); - i++) { - block_transfer_infos.emplace_back( - BlockTransferInfo(-1, - host_blocks[i].id(), - host_blocks[i].get_immutable_hash_value(), - TransferType::G2H)); - } - - engine_->prefetch_from_storage(prefill_sequence->dp_rank(), - prefill_sequence->get_termination_flag(), - std::move(block_transfer_infos), - prefill_sequence->get_prefetch_results()); - } - } -} - // step the scheduler forward by one step // may get blocked if there are no requests to process void ContinuousScheduler::step(const absl::Duration& timeout) { diff --git a/xllm/core/scheduler/continuous_scheduler.h b/xllm/core/scheduler/continuous_scheduler.h index 4167b7416..f3b840156 100644 --- a/xllm/core/scheduler/continuous_scheduler.h +++ b/xllm/core/scheduler/continuous_scheduler.h @@ -121,6 +121,9 @@ class ContinuousScheduler : public Scheduler { // Index ID for internal server ID, which must be set different values // if the model supports multiple version or there are multiple models. PROPERTY(int64_t, server_idx) = 0; + + // Prefetch timeout for prefetch from kv cache store + PROPERTY(uint32_t, prefetch_timeout) = 0; }; ContinuousScheduler(Engine* engine, const Options& options); @@ -270,8 +273,6 @@ class ContinuousScheduler : public Scheduler { size_t& num_online_decode_preempt_offline_requests, std::unique_ptr& running_queue); - virtual void prefetch_from_storage(std::shared_ptr& request); - void handle_abnormal_request( std::unique_ptr& running_queue, const std::vector& candidate_sequences, diff --git a/xllm/core/scheduler/prefill_only_scheduler.cpp b/xllm/core/scheduler/prefill_only_scheduler.cpp index 9167f5f01..6b84405f2 100644 --- a/xllm/core/scheduler/prefill_only_scheduler.cpp +++ b/xllm/core/scheduler/prefill_only_scheduler.cpp @@ -85,6 +85,13 @@ void PrefillOnlyScheduler::handle_prefill_requests( << "Waiting request should have only one sequence."; } + if (!kv_cache_manager_->update_prefetch_result( + request, options_.prefetch_timeout())) { + waiting_priority_queue.pop(); + waiting_priority_queue.push(request); + continue; + } + // TODO: FIXME later // Optimization of the scheduling algorithm under multiple sequences // TODO: can refactor like handle_decode otherwise request with multiple @@ -102,7 +109,6 @@ void PrefillOnlyScheduler::handle_prefill_requests( continue; } - prefill_sequence->update_prefetch_result(); // FIXME: use actual num_tokens to handle // Currently overestimating the number of tokens actually processed when // enable prefix cache @@ -291,7 +297,6 @@ void PrefillOnlyScheduler::handle_last_step_prefill_requests( continue; } - prefill_sequence->update_prefetch_result(); // FIXME: use actual num_tokens to handle // Currently overestimating the number of tokens actually processed when // enable prefix cache diff --git a/xllm/core/util/blockingconcurrentqueue.h b/xllm/core/util/blockingconcurrentqueue.h index f3f39fe5c..be3fcd5a0 100644 --- a/xllm/core/util/blockingconcurrentqueue.h +++ b/xllm/core/util/blockingconcurrentqueue.h @@ -606,4 +606,4 @@ inline void swap(BlockingConcurrentQueue& a, } } // end namespace moodycamel -} // namespace xllm +} // namespace xllm \ No newline at end of file diff --git a/xllm/models/llm/npu/deepseek_v2.h b/xllm/models/llm/npu/deepseek_v2.h index d040379d6..cd08cdd1b 100644 --- a/xllm/models/llm/npu/deepseek_v2.h +++ b/xllm/models/llm/npu/deepseek_v2.h @@ -172,10 +172,8 @@ class DeepseekV2ModelImpl : public torch::nn::Module { event = input_params.layer_synchronizer->get_event(i); event_flag = input_params.layer_synchronizer->get_event_flag(i); } - if (input_params.layer_wise_load_synchronizer != nullptr) { - if (!input_params.layer_wise_load_synchronizer->synchronize_layer(i)) { - return torch::Tensor(); - } + if (!input_params.synchronize_layer(i)) { + return torch::Tensor(); } auto& layer = layers_[i]; diff --git a/xllm/models/llm/npu/deepseek_v2_mtp.h b/xllm/models/llm/npu/deepseek_v2_mtp.h index 4472b059e..8a7f3da56 100644 --- a/xllm/models/llm/npu/deepseek_v2_mtp.h +++ b/xllm/models/llm/npu/deepseek_v2_mtp.h @@ -124,6 +124,12 @@ class DeepseekV2MtpModelImpl : public torch::nn::Module { auto attn_mask = attn_mask_.get_attn_mask( 128, cos_pos.dtype().toScalarType(), cos_pos.device()); + + // TODO(liangzhiwei20): MTP need more support for layer wise copy. + if (input_params.layer_wise_load_synchronizer != nullptr) { + LOG(FATAL) << "MTP not support layer wise copy!"; + } + for (size_t i = 0; i < layers_.size(); i++) { aclrtEvent* event = nullptr; std::atomic* event_flag = nullptr; @@ -131,11 +137,6 @@ class DeepseekV2MtpModelImpl : public torch::nn::Module { event = input_params.layer_synchronizer->get_event(i); event_flag = input_params.layer_synchronizer->get_event_flag(i); } - if (input_params.layer_wise_load_synchronizer != nullptr) { - if (!input_params.layer_wise_load_synchronizer->synchronize_layer(i)) { - return torch::Tensor(); - } - } auto& layer = layers_[i]; layer(h, diff --git a/xllm/models/llm/npu/glm4.h b/xllm/models/llm/npu/glm4.h index d5999c543..6e12427d7 100644 --- a/xllm/models/llm/npu/glm4.h +++ b/xllm/models/llm/npu/glm4.h @@ -149,10 +149,8 @@ class Glm4ModelImpl : public LlmModelImplBase { event = input_params.layer_synchronizer->get_event(i); event_flag = input_params.layer_synchronizer->get_event_flag(i); } - if (input_params.layer_wise_load_synchronizer != nullptr) { - if (!input_params.layer_wise_load_synchronizer->synchronize_layer(i)) { - return torch::Tensor(); - } + if (!input_params.synchronize_layer(i)) { + return torch::Tensor(); } auto& layer = layers_[i]; diff --git a/xllm/models/llm/npu/glm4_moe.h b/xllm/models/llm/npu/glm4_moe.h index 256df1e3e..244a98f2e 100644 --- a/xllm/models/llm/npu/glm4_moe.h +++ b/xllm/models/llm/npu/glm4_moe.h @@ -209,10 +209,8 @@ class Glm4MoeModelImpl : public torch::nn::Module { event = input_params.layer_synchronizer->get_event(i); event_flag = input_params.layer_synchronizer->get_event_flag(i); } - if (input_params.layer_wise_load_synchronizer != nullptr) { - if (!input_params.layer_wise_load_synchronizer->synchronize_layer(i)) { - return torch::Tensor(); - } + if (!input_params.synchronize_layer(i)) { + return torch::Tensor(); } auto& layer = layers_[i]; diff --git a/xllm/models/llm/npu/glm4_moe_mtp.h b/xllm/models/llm/npu/glm4_moe_mtp.h index ccbd6a119..f744f887d 100644 --- a/xllm/models/llm/npu/glm4_moe_mtp.h +++ b/xllm/models/llm/npu/glm4_moe_mtp.h @@ -147,6 +147,11 @@ class Glm4MoeMtpModelImpl : public torch::nn::Module { input_length * num_experts_per_tok_, torch::TensorOptions().dtype(torch::kInt32).device(tokens.device())); + // TODO(liangzhiwei20): MTP need more support for layer wise copy. + if (input_params.layer_wise_load_synchronizer != nullptr) { + LOG(FATAL) << "MTP not support layer wise copy!"; + } + for (size_t i = 0; i < layers_.size(); i++) { aclrtEvent* event = nullptr; std::atomic* event_flag = nullptr; @@ -154,12 +159,6 @@ class Glm4MoeMtpModelImpl : public torch::nn::Module { event = input_params.layer_synchronizer->get_event(i); event_flag = input_params.layer_synchronizer->get_event_flag(i); } - // TODO(liangzhiwei20): MTP need more support for layer wise copy. - if (input_params.layer_wise_load_synchronizer != nullptr) { - if (!input_params.layer_wise_load_synchronizer->synchronize_layer(i)) { - return torch::Tensor(); - } - } auto& layer = layers_[i]; layer(h, diff --git a/xllm/models/llm/npu/llm_model_base.h b/xllm/models/llm/npu/llm_model_base.h index 0940af3db..f00098239 100644 --- a/xllm/models/llm/npu/llm_model_base.h +++ b/xllm/models/llm/npu/llm_model_base.h @@ -199,10 +199,8 @@ class LlmModelImplBase : public torch::nn::Module { event = input_params.layer_synchronizer->get_event(i); event_flag = input_params.layer_synchronizer->get_event_flag(i); } - if (input_params.layer_wise_load_synchronizer != nullptr) { - if (!input_params.layer_wise_load_synchronizer->synchronize_layer(i)) { - return torch::Tensor(); - } + if (!input_params.synchronize_layer(i)) { + return torch::Tensor(); } auto& layer = layers_[i]; diff --git a/xllm/models/llm/npu/qwen3.h b/xllm/models/llm/npu/qwen3.h index 62ff47d30..3afcc6064 100644 --- a/xllm/models/llm/npu/qwen3.h +++ b/xllm/models/llm/npu/qwen3.h @@ -163,10 +163,8 @@ class QWen3ModelImpl : public LlmModelImplBase { event = input_params.layer_synchronizer->get_event(i); event_flag = input_params.layer_synchronizer->get_event_flag(i); } - if (input_params.layer_wise_load_synchronizer != nullptr) { - if (!input_params.layer_wise_load_synchronizer->synchronize_layer(i)) { - return torch::Tensor(); - } + if (!input_params.synchronize_layer(i)) { + return torch::Tensor(); } auto& layer = layers_[i]; diff --git a/xllm/models/llm/npu/qwen3_moe.h b/xllm/models/llm/npu/qwen3_moe.h index a28edca33..6912b0f12 100644 --- a/xllm/models/llm/npu/qwen3_moe.h +++ b/xllm/models/llm/npu/qwen3_moe.h @@ -260,10 +260,8 @@ class Qwen3MoeModelImpl : public torch::nn::Module { event = input_params.layer_synchronizer->get_event(i); event_flag = input_params.layer_synchronizer->get_event_flag(i); } - if (input_params.layer_wise_load_synchronizer != nullptr) { - if (!input_params.layer_wise_load_synchronizer->synchronize_layer(i)) { - return torch::Tensor(); - } + if (!input_params.synchronize_layer(i)) { + return torch::Tensor(); } auto& layer = layers_[i]; diff --git a/xllm/models/llm/qwen3_moe.h b/xllm/models/llm/qwen3_moe.h index 07f089817..aa7055b30 100644 --- a/xllm/models/llm/qwen3_moe.h +++ b/xllm/models/llm/qwen3_moe.h @@ -225,7 +225,6 @@ class Qwen3MoeModelImpl : public torch::nn::Module { } auto deep_stacks = input_params.deep_stacks; int deep_stack_size = deep_stacks.size(); - ModelInputParams modified_input_params = input_params; layer::update_dummy_run_input(dp_rank_, positions, modified_input_params); bool is_prefill = modified_input_params.q_max_seq_len > 1; diff --git a/xllm/proto/xservice.proto b/xllm/proto/xservice.proto index ae7c17954..fe487c4c3 100644 --- a/xllm/proto/xservice.proto +++ b/xllm/proto/xservice.proto @@ -42,7 +42,6 @@ message InstanceMetaInfo { message KvCacheEvent { repeated bytes stored_cache = 1; repeated bytes removed_cache = 2; - repeated bytes offload_cache = 3; } message LoadMetrics { @@ -55,7 +54,6 @@ message LatencyMetrics { int64 recent_max_tbt = 2; } -// TODO: add metainfo/metrics message HeartbeatRequest { string name = 1; KvCacheEvent cache_event = 2; diff --git a/xllm/xllm.cpp b/xllm/xllm.cpp index 8caac2230..c8e25259c 100755 --- a/xllm/xllm.cpp +++ b/xllm/xllm.cpp @@ -181,6 +181,9 @@ int run() { .enable_kvcache_store(FLAGS_enable_kvcache_store && FLAGS_enable_prefix_cache && (FLAGS_host_blocks_factor > 0.0)) + .prefetch_timeout(FLAGS_prefetch_timeout) + .prefetch_bacth_size(FLAGS_prefetch_bacth_size) + .layers_wise_copy_batchs(FLAGS_layers_wise_copy_batchs) .store_protocol(FLAGS_store_protocol) .store_master_server_address(FLAGS_store_master_server_address) .store_metadata_server(FLAGS_store_metadata_server)