Skip to content

Commit fe86628

Browse files
authored
Allow to Change Channel Spilling Wakeup Callback (#30264)
1 parent b2469aa commit fe86628

File tree

5 files changed

+41
-16
lines changed

5 files changed

+41
-16
lines changed

ydb/library/yql/dq/actors/spilling/channel_storage.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class TDqChannelStorage : public IDqChannelStorage {
3030
NThreading::TFuture<void> IsBlobWrittenFuture_;
3131
};
3232
public:
33-
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback,
33+
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback,
3434
TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters, TActorSystem* actorSystem)
3535
: ActorSystem_(actorSystem)
3636
{
@@ -91,6 +91,10 @@ class TDqChannelStorage : public IDqChannelStorage {
9191
return true;
9292
}
9393

94+
void SetWakeUpCallback(TWakeUpCallback&& wakeUpCallback) override {
95+
ActorSystem_->Send(ChannelStorageActorId_, new TEvDqChannelSpilling::TEvSetWakeUpCallback(std::move(wakeUpCallback)));
96+
}
97+
9498
private:
9599
void UpdateWriteStatus() {
96100
for (auto it = WritingBlobs_.begin(); it != WritingBlobs_.end();) {

ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ namespace {
3535
LOG_WARN_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s);
3636

3737
#define LOG_T(s) \
38-
LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s);
38+
LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s);
3939

4040
class TDqChannelStorageActor : public IDqChannelStorageActor,
4141
public NActors::TActorBootstrapped<TDqChannelStorageActor>
@@ -98,8 +98,9 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
9898
hFunc(TEvDqSpilling::TEvWriteResult, HandleWork);
9999
hFunc(TEvDqSpilling::TEvReadResult, HandleWork);
100100
hFunc(TEvDqSpilling::TEvError, HandleWork);
101-
hFunc(TEvDqChannelSpilling::TEvGet, HandleWork);
102101
hFunc(TEvDqChannelSpilling::TEvPut, HandleWork);
102+
hFunc(TEvDqChannelSpilling::TEvGet, HandleWork);
103+
hFunc(TEvDqChannelSpilling::TEvSetWakeUpCallback, HandleWork);
103104
cFunc(TEvents::TEvPoison::EventType, PassAway);
104105
default:
105106
Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s",
@@ -108,30 +109,33 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
108109
}
109110
}
110111

112+
void HandleWork(TEvDqChannelSpilling::TEvPut::TPtr& ev) {
113+
auto& msg = *ev->Get();
114+
LOG_T("[TEvPut] blobId: " << msg.BlobId_);
115+
116+
auto opBegin = TInstant::Now();
117+
118+
auto writingBlobInfo = TWritingBlobInfo{msg.Blob_.Size(), std::move(msg.Promise_), opBegin};
119+
WritingBlobs_.emplace(msg.BlobId_, std::move(writingBlobInfo));
111120

121+
SendInternal(SpillingActorId_, new TEvDqSpilling::TEvWrite(msg.BlobId_, std::move(msg.Blob_)));
122+
}
112123

113124
void HandleWork(TEvDqChannelSpilling::TEvGet::TPtr& ev) {
114125
auto& msg = *ev->Get();
115126
LOG_T("[TEvGet] blobId: " << msg.BlobId_);
116127

117128
auto opBegin = TInstant::Now();
118-
129+
119130
auto loadingBlobInfo = TLoadingBlobInfo{std::move(msg.Promise_), opBegin};
120131
LoadingBlobs_.emplace(msg.BlobId_, std::move(loadingBlobInfo));
121132

122133
SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.BlobId_));
123134
}
124135

125-
void HandleWork(TEvDqChannelSpilling::TEvPut::TPtr& ev) {
136+
void HandleWork(TEvDqChannelSpilling::TEvSetWakeUpCallback::TPtr& ev) {
126137
auto& msg = *ev->Get();
127-
LOG_T("[TEvPut] blobId: " << msg.BlobId_);
128-
129-
auto opBegin = TInstant::Now();
130-
131-
auto writingBlobInfo = TWritingBlobInfo{msg.Blob_.Size(), std::move(msg.Promise_), opBegin};
132-
WritingBlobs_.emplace(msg.BlobId_, std::move(writingBlobInfo));
133-
134-
SendInternal(SpillingActorId_, new TEvDqSpilling::TEvWrite(msg.BlobId_, std::move(msg.Blob_)));
138+
WakeUpCallback_ = std::move(msg.WakeUpCallback_);
135139
}
136140

137141
void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
@@ -203,7 +207,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
203207

204208
// BlobId -> blob size + promise that blob is saved
205209
std::unordered_map<ui64, TWritingBlobInfo> WritingBlobs_;
206-
210+
207211
// BlobId -> promise with requested blob
208212
std::unordered_map<ui64, TLoadingBlobInfo> LoadingBlobs_;
209213

ydb/library/yql/dq/actors/spilling/channel_storage_actor.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ namespace NYql::NDq {
1414
struct TDqChannelStorageActorEvents {
1515
enum {
1616
EvPut = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 30100,
17-
EvGet
17+
EvGet,
18+
EvSetWakeUpCallback
1819
};
1920
};
2021

@@ -42,6 +43,15 @@ struct TEvDqChannelSpilling {
4243
ui64 BlobId_;
4344
NThreading::TPromise<TBuffer> Promise_;
4445
};
46+
47+
struct TEvSetWakeUpCallback : NActors::TEventLocal<TEvSetWakeUpCallback, TDqChannelStorageActorEvents::EvSetWakeUpCallback> {
48+
TEvSetWakeUpCallback(TWakeUpCallback&& wakeUpCallback)
49+
: WakeUpCallback_(std::move(wakeUpCallback))
50+
{
51+
}
52+
53+
TWakeUpCallback WakeUpCallback_;
54+
};
4555
};
4656

4757
class IDqChannelStorageActor

ydb/library/yql/dq/runtime/dq_channel_storage.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class TDqChannelStorageException : public yexception {
1414
class IDqChannelStorage : public TSimpleRefCount<IDqChannelStorage> {
1515
public:
1616
using TPtr = TIntrusivePtr<IDqChannelStorage>;
17+
using TWakeUpCallback = std::function<void()>;
1718

1819
public:
1920
virtual ~IDqChannelStorage() = default;
@@ -30,7 +31,9 @@ class IDqChannelStorage : public TSimpleRefCount<IDqChannelStorage> {
3031
// It is better to replace Get() with Pull() which will delete blob after read
3132
// (current clients read each blob exactly once)
3233
// Get() will return false if data is not ready yet. Client should repeat Get() in this case
33-
virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0;
34+
virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0;
35+
36+
virtual void SetWakeUpCallback(TWakeUpCallback&& wakeUpCallback) = 0;
3437
};
3538

3639
} // namespace NYql::NDq

ydb/library/yql/dq/runtime/ut/ut_helper.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ class TMockChannelStorage : public IDqChannelStorage {
5959
return true;
6060
}
6161

62+
void SetWakeUpCallback(TWakeUpCallback&& /* wakeUpCallback */) override {
63+
64+
}
65+
6266
public:
6367
void SetBlankGetRequests(ui32 count) {
6468
GetBlankRequests = count;

0 commit comments

Comments
 (0)