Skip to content

Commit 875da2e

Browse files
committed
feat: support prefetch with timeout.
1 parent 200b560 commit 875da2e

File tree

13 files changed

+93
-25
lines changed

13 files changed

+93
-25
lines changed

xllm/core/distributed_runtime/comm_channel.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -372,13 +372,13 @@ void CommChannel::transfer_kv_blocks(
372372

373373
class ClientStreamReceiver : public brpc::StreamInputHandler {
374374
private:
375-
const std::atomic<bool>& termination_flag_;
375+
std::atomic<bool>* termination_flag_;
376376
std::shared_ptr<std::atomic<uint32_t>> success_cnt_;
377377
std::promise<void> close_promise_;
378378
std::atomic<bool> promise_set_{false};
379379

380380
public:
381-
ClientStreamReceiver(const std::atomic<bool>& termination_flag,
381+
ClientStreamReceiver(std::atomic<bool>* termination_flag,
382382
std::shared_ptr<std::atomic<uint32_t>>& success_cnt)
383383
: termination_flag_(termination_flag), success_cnt_(success_cnt) {}
384384

@@ -398,9 +398,10 @@ class ClientStreamReceiver : public brpc::StreamInputHandler {
398398
int32_t success_cnt = std::stoi(msg_str);
399399

400400
if (success_cnt > 0 &&
401-
!termination_flag_.load(std::memory_order_acquire)) {
401+
!termination_flag_->load(std::memory_order_acquire)) {
402402
success_cnt_->fetch_add(success_cnt, std::memory_order_relaxed);
403403
} else {
404+
termination_flag_->store(true, std::memory_order_release);
404405
brpc::StreamClose(id);
405406
if (!promise_set_.exchange(true)) {
406407
close_promise_.set_value();
@@ -425,8 +426,8 @@ class ClientStreamReceiver : public brpc::StreamInputHandler {
425426
};
426427

427428
void CommChannel::prefetch_from_storage(
428-
const std::atomic<bool>& flag,
429429
const std::vector<BlockTransferInfo>& block_transfer_info,
430+
std::atomic<bool>* flag,
430431
std::shared_ptr<std::atomic<uint32_t>>& success_cnt) {
431432
proto::BlockTransferInfos pb_block_transfer_info;
432433
if (!block_transfer_info_to_proto(block_transfer_info,

xllm/core/distributed_runtime/comm_channel.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ class CommChannel {
9898
const std::vector<BlockTransferInfo>& block_transfer_info);
9999

100100
virtual void prefetch_from_storage(
101-
const std::atomic<bool>& flag,
102101
const std::vector<BlockTransferInfo>& block_transfer_info,
102+
std::atomic<bool>* flag,
103103
std::shared_ptr<std::atomic<uint32_t>>& success_cnt);
104104

105105
virtual bool get_last_step_result_async(

xllm/core/distributed_runtime/remote_worker.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -313,15 +313,15 @@ void RemoteWorker::transfer_kv_blocks(
313313
}
314314

315315
void RemoteWorker::prefetch_from_storage(
316-
const std::atomic<bool>& flag,
317316
const std::vector<BlockTransferInfo>& block_transfer_info,
317+
std::atomic<bool>* flag,
318318
std::shared_ptr<std::atomic<uint32_t>>& success_cnt) {
319319
copy_threadpool_.schedule(
320320
[this,
321-
flag = &flag,
321+
flag = flag,
322322
block_transfer_info = std::move(block_transfer_info),
323323
success_cnt = success_cnt]() mutable {
324-
channel_->prefetch_from_storage(flag, block_transfer_info, success_cnt);
324+
channel_->prefetch_from_storage(block_transfer_info, flag, success_cnt);
325325
});
326326
}
327327

xllm/core/distributed_runtime/remote_worker.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ class RemoteWorker : public WorkerClient {
120120
const std::vector<BlockTransferInfo>& block_transfer_info) override;
121121

122122
virtual void prefetch_from_storage(
123-
const std::atomic<bool>& flag,
124123
const std::vector<BlockTransferInfo>& block_transfer_info,
124+
std::atomic<bool>* flag,
125125
std::shared_ptr<std::atomic<uint32_t>>& success_cnt) override;
126126

127127
// Run the model and return the output.

xllm/core/framework/request/sequence.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ void Sequence::add_host_kv_blocks(const std::vector<Block>& blocks) {
381381
void Sequence::reset() {
382382
kv_state_.reset();
383383
host_kv_state_.reset();
384+
timeout_checker_.reset();
384385
volatile_num_prompt_tokens_ = num_tokens_;
385386
}
386387

@@ -455,9 +456,14 @@ Slice<int32_t> Sequence::get_generated_tokens() const {
455456
return {tokens_.data(), 0};
456457
}
457458

458-
void Sequence::update_prefetch_result() {
459+
bool Sequence::update_prefetch_result() {
459460
if (prefetch_results_.empty()) {
460-
return;
461+
return true;
462+
}
463+
464+
if (!termination_flag_.load(std::memory_order_acquire) &&
465+
timeout_checker_.check_timeout()) {
466+
return false;
461467
}
462468

463469
termination_flag_.store(true, std::memory_order_release);
@@ -470,6 +476,7 @@ void Sequence::update_prefetch_result() {
470476
success_cnt * host_kv_state_.kv_blocks()[0].size());
471477
}
472478
prefetch_results_.clear();
479+
return true;
473480
}
474481

475482
} // namespace xllm

xllm/core/framework/request/sequence.h

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ limitations under the License.
1919
#include <absl/time/time.h>
2020
#include <folly/futures/Future.h>
2121

22+
#include <chrono>
2223
#include <cstdint>
2324
#include <vector>
2425

@@ -81,6 +82,44 @@ struct SequenceParams {
8182
StoppingChecker* stopping_checker; // not owned
8283
};
8384

85+
static uint32_t timeout_ms = 0;
86+
class TimeoutChecker {
87+
private:
88+
std::chrono::steady_clock::time_point timeout_start_;
89+
bool is_timeout_set_ = false;
90+
91+
public:
92+
TimeoutChecker() { init(); }
93+
94+
bool check_timeout() {
95+
if (!is_timeout_set_) {
96+
timeout_start_ = std::chrono::steady_clock::now();
97+
is_timeout_set_ = true;
98+
99+
return false;
100+
} else {
101+
auto now = std::chrono::steady_clock::now();
102+
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
103+
now - timeout_start_);
104+
105+
return elapsed.count() >= timeout_ms;
106+
}
107+
}
108+
109+
void reset() { is_timeout_set_ = false; }
110+
111+
private:
112+
static void init_timeout() {
113+
const char* env_str = std::getenv("PREFETCH_TIMEOUT_MS");
114+
timeout_ms = env_str ? std::strtoul(env_str, nullptr, 10) : 0;
115+
LOG(INFO) << "Prefetch timeout set as: " << timeout_ms;
116+
}
117+
static void init() {
118+
static std::once_flag flag_;
119+
std::call_once(flag_, init_timeout);
120+
}
121+
};
122+
84123
class Sequence final {
85124
public:
86125
Sequence(size_t index,
@@ -242,12 +281,12 @@ class Sequence final {
242281
const Tokenizer& tokenizer,
243282
std::optional<std::vector<LogProb>>& out_logprobs);
244283

245-
const std::atomic<bool>& get_termination_flag() { return termination_flag_; }
284+
std::atomic<bool>* get_termination_flag() { return &termination_flag_; }
246285
std::vector<std::shared_ptr<std::atomic<uint32_t>>>* get_prefetch_results() {
247286
return &prefetch_results_;
248287
}
249288

250-
void update_prefetch_result();
289+
bool update_prefetch_result();
251290

252291
void reset();
253292

@@ -361,6 +400,8 @@ class Sequence final {
361400
// kvcache store copy async result
362401
std::atomic<bool> termination_flag_{false};
363402
std::vector<std::shared_ptr<std::atomic<uint32_t>>> prefetch_results_;
403+
404+
TimeoutChecker timeout_checker_;
364405
};
365406

366407
} // namespace xllm

xllm/core/runtime/engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ class Engine {
9696

9797
virtual void prefetch_from_storage(
9898
const uint32_t dp_rank,
99-
const std::atomic<bool>& flag,
10099
const std::vector<BlockTransferInfo>& block_transfer_info,
100+
std::atomic<bool>* flag,
101101
std::vector<std::shared_ptr<std::atomic<uint32_t>>>* prefetch_results) {
102102
LOG(FATAL) << " prefetch_from_storage is not implemented!";
103103
};

xllm/core/runtime/llm_engine.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -510,15 +510,15 @@ void LLMEngine::transfer_kv_blocks(
510510

511511
void LLMEngine::prefetch_from_storage(
512512
const uint32_t dp_rank,
513-
const std::atomic<bool>& flag,
514513
const std::vector<BlockTransferInfo>& block_transfer_info,
514+
std::atomic<bool>* flag,
515515
std::vector<std::shared_ptr<std::atomic<uint32_t>>>* prefetch_results) {
516-
prefetch_results->resize(dp_local_tp_size_,
517-
std::make_shared<std::atomic<uint32_t>>(0));
516+
prefetch_results->reserve(dp_local_tp_size_);
518517
for (auto tp_rank = 0; tp_rank < dp_local_tp_size_; ++tp_rank) {
518+
prefetch_results->emplace_back(std::make_shared<std::atomic<uint32_t>>(0));
519519
worker_clients_[tp_rank + dp_local_tp_size_ * dp_rank]
520520
->prefetch_from_storage(
521-
flag, block_transfer_info, prefetch_results->at(tp_rank));
521+
block_transfer_info, flag, prefetch_results->at(tp_rank));
522522
}
523523
}
524524

xllm/core/runtime/llm_engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ class LLMEngine : public Engine {
8282

8383
void prefetch_from_storage(
8484
const uint32_t dp_rank,
85-
const std::atomic<bool>& flag,
8685
const std::vector<BlockTransferInfo>& block_transfer_info,
86+
std::atomic<bool>* flag,
8787
std::vector<std::shared_ptr<std::atomic<uint32_t>>>* prefetch_results)
8888
override;
8989

xllm/core/runtime/worker_client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ folly::SemiFuture<uint32_t> WorkerClient::transfer_kv_blocks(
166166
}
167167

168168
void WorkerClient::prefetch_from_storage(
169-
const std::atomic<bool>& flag,
170169
const std::vector<BlockTransferInfo>& block_transfer_info,
170+
std::atomic<bool>* flag,
171171
std::shared_ptr<std::atomic<uint32_t>>& success_cnt) {
172172
LOG(FATAL) << "WorkerClient Method prefetch_from_storage is UnImplemented.";
173173
}

0 commit comments

Comments
 (0)