Skip to content

Commit 804872d

Browse files
authored
Improve KqpHost introspection, add KqpExecuter and CA virtual pages (#30205)
1 parent 37c50f8 commit 804872d

File tree

6 files changed

+173
-22
lines changed

6 files changed

+173
-22
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,6 +1142,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11421142
hFunc(TEvKqpBuffer::TEvError, Handle);
11431143
hFunc(NFq::TEvCheckpointCoordinator::TEvZeroCheckpointDone, Handle);
11441144
hFunc(NFq::TEvCheckpointCoordinator::TEvRaiseTransientIssues, Handle);
1145+
hFunc(NActors::NMon::TEvHttpInfo, HandleHttpInfo);
11451146
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
11461147
default:
11471148
UnexpectedEvent("ExecuteState", ev->GetTypeRewrite());

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,51 @@ class TKqpExecuterBase : public TActor<TDerived> {
581581
static_cast<TDerived*>(this)->CheckExecutionComplete();
582582
}
583583

584+
void HandleHttpInfo(NMon::TEvHttpInfo::TPtr& ev) {
585+
TStringStream str;
586+
HTML(str) {
587+
PRE() {
588+
str << "KQP Executer, SelfId=" << SelfId() << Endl;
589+
590+
TABLE_SORTABLE_CLASS("table table-condensed") {
591+
TABLEHEAD() {
592+
TABLER() {
593+
TABLEH() {str << "TxId";}
594+
TABLEH() {str << "StageId";}
595+
TABLEH() {str << "TaskId";}
596+
TABLEH() {str << "NodeId";}
597+
TABLEH() {str << "ActorId";}
598+
TABLEH() {str << "Completed";}
599+
}
600+
}
601+
TABLEBODY() {
602+
for (const auto& task : TasksGraph.GetTasks()) {
603+
TABLER() {
604+
TABLED() {str << task.StageId.TxId;}
605+
TABLED() {str << task.StageId.StageId;}
606+
TABLED() {str << task.Id;}
607+
TABLED() {str << task.Meta.NodeId;}
608+
TABLED() {
609+
if (task.ComputeActorId) {
610+
HREF(TStringBuilder() << "/node/" << task.ComputeActorId.NodeId() << "/actors/kqp_node?ca=" << task.ComputeActorId) {
611+
str << task.ComputeActorId;
612+
}
613+
} else {
614+
str << "N/A";
615+
}
616+
str << Endl;
617+
}
618+
TABLED() {str << task.Meta.Completed;}
619+
}
620+
}
621+
}
622+
}
623+
}
624+
}
625+
626+
this->Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
627+
}
628+
584629
STATEFN(ReadyState) {
585630
switch (ev->GetTypeRewrite()) {
586631
hFunc(TEvKqpExecuter::TEvTxRequest, HandleReady);

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
105105
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
106106
IgnoreFunc(TEvKqpNode::TEvCancelKqpTasksResponse);
107107
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
108+
hFunc(NActors::NMon::TEvHttpInfo, HandleHttpInfo);
108109
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
109110
default:
110111
UnexpectedEvent("ExecuteState", ev->GetTypeRewrite());

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -510,14 +510,28 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
510510
}
511511

512512
void HandleWork(NMon::TEvHttpInfo::TPtr& ev) {
513+
514+
const TCgiParameters &cgi = ev->Get()->Request.GetParams();
515+
TActorId id;
516+
517+
auto caId = cgi.Get("ca");
518+
if (caId && State_->ValidateComputeActorId(caId, id)) {
519+
TActivationContext::Send(ev->Forward(id));
520+
return;
521+
}
522+
523+
auto exId = cgi.Get("ex");
524+
if (exId && State_->ValidateKqpExecuterId(exId, SelfId().NodeId(), id)) {
525+
TActivationContext::Send(ev->Forward(id));
526+
return;
527+
}
528+
513529
TStringStream str;
514530
HTML(str) {
515531
PRE() {
516-
str << "Current config:" << Endl;
532+
str << "TKqpNodeService, SelfId=" << SelfId() << Endl;
533+
str << Endl << "Current config:" << Endl;
517534
str << Config.DebugString() << Endl;
518-
str << Endl;
519-
520-
str << Endl << "Transactions:" << Endl;
521535
State_->DumpInfo(str);
522536
}
523537
}

ydb/core/kqp/node_service/kqp_node_state.cpp

Lines changed: 106 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -113,31 +113,119 @@ std::vector<TNodeRequest::TTaskInfo> TNodeState::GetTasksByTxId(ui64 txId) const
113113
}
114114

115115
void TNodeState::DumpInfo(TStringStream& str) const {
116-
for (const auto& bucket : Buckets) {
117-
TReadGuard guard(bucket.Mutex);
118-
TMap<ui64, TVector<std::pair<const TActorId, const TNodeRequest*>>> byTx;
116+
HTML(str) {
117+
str << Endl << "Transactions:" << Endl;
118+
TABLE_SORTABLE_CLASS("table table-condensed") {
119+
TABLEHEAD() {
120+
TABLER() {
121+
TABLEH() {str << "TxId";}
122+
TABLEH() {str << "Executer";}
123+
TABLEH() {str << "StartTime";}
124+
TABLEH() {str << "Deadline";}
125+
}
126+
}
127+
TABLEBODY() {
128+
for (const auto& bucket : Buckets) {
129+
TReadGuard guard(bucket.Mutex);
130+
TMap<ui64, TVector<std::pair<const TActorId, const TNodeRequest*>>> byTx;
119131

120-
for (const auto& [txId, request] : bucket.Requests) {
121-
byTx[txId].emplace_back(request.ExecuterId, &request);
132+
for (const auto& [txId, request] : bucket.Requests) {
133+
byTx[txId].emplace_back(request.ExecuterId, &request);
134+
}
135+
136+
for (const auto& [txId, requests] : byTx) {
137+
for (auto& [requester, request] : requests) {
138+
TABLER() {
139+
TABLED() {str << txId;}
140+
TABLED() {
141+
HREF(TStringBuilder() << "/node/" << requester.NodeId() << "/actors/kqp_node?ex=" << requester) {
142+
str << requester;
143+
}
144+
}
145+
TABLED() {str << request->StartTime;}
146+
TABLED() {str << request->Deadline;}
147+
}
148+
}
149+
}
150+
}
151+
}
122152
}
123-
for (const auto& [txId, requests] : byTx) {
124-
str << " Requests:" << Endl;
125-
for (auto& [requester, request] : requests) {
126-
str << " Requester: " << requester << Endl;
127-
str << " StartTime: " << request->StartTime << Endl;
128-
str << " Deadline: " << request->Deadline << Endl;
129-
str << " In-fly tasks:" << Endl;
130-
for (auto& [taskId, actorId] : request->Tasks) {
131-
str << " Task: " << taskId << Endl;
132-
if (actorId) {
133-
str << " Compute actor: " << *actorId << Endl;
134-
} else {
135-
str << " Compute actor: (task not started yet)" << Endl;
153+
154+
str << Endl << "Tasks:" << Endl;
155+
TABLE_SORTABLE_CLASS("table table-condensed") {
156+
TABLEHEAD() {
157+
TABLER() {
158+
TABLEH() {str << "TxId";}
159+
TABLEH() {str << "Executer";}
160+
TABLEH() {str << "TaskId";}
161+
TABLEH() {str << "ComputeActorId";}
162+
}
163+
}
164+
TABLEBODY() {
165+
for (const auto& bucket : Buckets) {
166+
TReadGuard guard(bucket.Mutex);
167+
TMap<ui64, TVector<std::pair<const TActorId, const TNodeRequest*>>> byTx;
168+
169+
for (const auto& [txId, request] : bucket.Requests) {
170+
byTx[txId].emplace_back(request.ExecuterId, &request);
171+
}
172+
173+
for (const auto& [txId, requests] : byTx) {
174+
for (auto& [requester, request] : requests) {
175+
for (auto& [taskId, actorId] : request->Tasks) {
176+
TABLER() {
177+
TABLED() {str << txId;}
178+
TABLED() {
179+
HREF(TStringBuilder() << "/node/" << requester.NodeId() << "/actors/kqp_node?ex=" << requester) {
180+
str << requester;
181+
}
182+
}
183+
TABLED() {str << taskId;}
184+
TABLED() {
185+
if (actorId) {
186+
HREF(TStringBuilder() << "/node/" << requester.NodeId() << "/actors/kqp_node?ca=" << *actorId) {
187+
str << *actorId;
188+
}
189+
} else {
190+
str << "N/A";
191+
}
192+
}
193+
}
194+
}
195+
}
136196
}
137197
}
138198
}
139199
}
140200
}
141201
}
142202

203+
bool TNodeState::ValidateComputeActorId(const TString& computeActorId, TActorId& id) const {
204+
for (const auto& bucket : Buckets) {
205+
TReadGuard guard(bucket.Mutex);
206+
for (const auto& [_, request] : bucket.Requests) {
207+
for (auto& [_, actorId] : request.Tasks) {
208+
if (actorId && ToString(*actorId) == computeActorId) {
209+
id = *actorId;
210+
return true;
211+
}
212+
}
213+
}
214+
}
215+
return false;
216+
}
217+
218+
bool TNodeState::ValidateKqpExecuterId(const TString& kqpExecuterId, ui32 nodeId, TActorId& id) const {
219+
for (const auto& bucket : Buckets) {
220+
TReadGuard guard(bucket.Mutex);
221+
for (const auto& [_, request] : bucket.Requests) {
222+
if (ToString(request.ExecuterId) == kqpExecuterId && request.ExecuterId.NodeId() == nodeId) {
223+
id = request.ExecuterId;
224+
return true;
225+
}
226+
}
227+
}
228+
return false;
229+
}
230+
143231
} // namespace NKikimr::NKqp::NKqpNode

ydb/core/kqp/node_service/kqp_node_state.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class TNodeState {
6161
std::vector<TNodeRequest::TTaskInfo> GetTasksByTxId(ui64 txId) const;
6262

6363
void DumpInfo(TStringStream& str) const;
64+
bool ValidateComputeActorId(const TString& computeActorId, TActorId& id) const;
65+
bool ValidateKqpExecuterId(const TString& kqpExecuterId, ui32 nodeId, TActorId& id) const;
6466

6567
private:
6668
inline auto& GetBucketByTxId(ui64 txId) {

0 commit comments

Comments
 (0)