From 4a5422421d87a6592f6b8caaa95d1d795d355c2d Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 13 Oct 2025 15:15:05 +0300 Subject: [PATCH 01/13] statistics: support multiple stat types in SaveStatisticsQuery --- .../statistics/aggregator/aggregator_impl.cpp | 26 +++---- .../statistics/aggregator/aggregator_impl.h | 1 + .../statistics/aggregator/analyze_actor.cpp | 18 +++-- .../aggregator/tx_aggr_stat_response.cpp | 4 + .../aggregator/tx_finish_trasersal.cpp | 17 +++- ydb/core/statistics/database/database.cpp | 77 ++++++++++--------- ydb/core/statistics/database/database.h | 4 +- .../statistics/database/ut/ut_database.cpp | 18 +++-- ydb/core/statistics/events.h | 22 ++++++ 9 files changed, 115 insertions(+), 72 deletions(-) diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index 7ca28a316834..66c58f726700 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -608,25 +608,23 @@ void TStatisticsAggregator::SaveStatisticsToTable() { PendingSaveStatistics = false; - std::vector columnTags; - std::vector data; - auto count = CountMinSketches.size(); - if (count == 0) { - Send(SelfId(), new TEvStatistics::TEvSaveStatisticsQueryResponse( - Ydb::StatusIds::SUCCESS, {}, TraversalPathId)); - return; - } - columnTags.reserve(count); - data.reserve(count); + std::vector items = std::exchange(StatisticsToSave, {}); for (auto& [tag, sketch] : CountMinSketches) { - columnTags.push_back(tag); + if (!ColumnNames.contains(tag)) { + continue; + } TString strSketch(sketch->AsStringBuf()); - data.push_back(strSketch); + items.emplace_back(tag, EStatType::COUNT_MIN_SKETCH, std::move(strSketch)); + } + + if (items.empty()) { + Send(SelfId(), new TEvStatistics::TEvSaveStatisticsQueryResponse( + Ydb::StatusIds::SUCCESS, {}, TraversalPathId)); + return; } - Register(CreateSaveStatisticsQuery(SelfId(), Database, - TraversalPathId, EStatType::COUNT_MIN_SKETCH, std::move(columnTags), std::move(data))); + Register(CreateSaveStatisticsQuery(SelfId(), Database, TraversalPathId, std::move(items))); } void TStatisticsAggregator::DeleteStatisticsFromTable() { diff --git a/ydb/core/statistics/aggregator/aggregator_impl.h b/ydb/core/statistics/aggregator/aggregator_impl.h index 40c63c907e6a..f394b65a2ec7 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.h +++ b/ydb/core/statistics/aggregator/aggregator_impl.h @@ -284,6 +284,7 @@ class TStatisticsAggregator : public TActor, public NTabl bool IsStatisticsTableCreated = false; bool PendingSaveStatistics = false; + std::vector StatisticsToSave; bool PendingDeleteStatistics = false; std::vector KeyColumnTypes; diff --git a/ydb/core/statistics/aggregator/analyze_actor.cpp b/ydb/core/statistics/aggregator/analyze_actor.cpp index 611b9055c70f..a4014b3c64fe 100644 --- a/ydb/core/statistics/aggregator/analyze_actor.cpp +++ b/ydb/core/statistics/aggregator/analyze_actor.cpp @@ -280,27 +280,29 @@ void TAnalyzeActor::Handle(TEvPrivate::TEvAnalyzeScanResult::TPtr& ev) { return; } - auto response = std::make_unique(); - auto& record = response->Record; + std::vector items; for (const auto& col : Columns) { - auto* column = record.AddColumns(); - column->SetTag(col.Tag); + TString data; - auto* stat = column->AddStatistics(); - stat->SetType(NKikimr::NStat::COUNT_MIN_SKETCH); NYdb::TValueParser val(result.AggColumns.at(col.Seq)); val.OpenOptional(); if (!val.IsNull()) { const auto& bytes = val.GetBytes(); - stat->SetData(bytes.data(), bytes.size()); + data.assign(bytes.data(), bytes.size()); } else { auto defaultVal = std::unique_ptr( TCountMinSketch::Create(CMS_WIDTH, CMS_DEPTH)); auto bytes = defaultVal->AsStringBuf(); - stat->SetData(bytes.data(), bytes.size()); + data.assign(bytes.data(), bytes.size()); } + + items.emplace_back( + col.Tag, + NKikimr::NStat::COUNT_MIN_SKETCH, + data); } + auto response = std::make_unique(std::move(items)); Send(Parent, response.release()); PassAway(); } diff --git a/ydb/core/statistics/aggregator/tx_aggr_stat_response.cpp b/ydb/core/statistics/aggregator/tx_aggr_stat_response.cpp index 3b25442ba0d8..4ced49e7a987 100644 --- a/ydb/core/statistics/aggregator/tx_aggr_stat_response.cpp +++ b/ydb/core/statistics/aggregator/tx_aggr_stat_response.cpp @@ -37,6 +37,10 @@ struct TStatisticsAggregator::TTxAggregateStatisticsResponse : public TTxBase { auto tag = column.GetTag(); for (auto& statistic : column.GetStatistics()) { if (statistic.GetType() == NKikimr::NStat::COUNT_MIN_SKETCH) { + if (!Self->ColumnNames.contains(tag)) { + continue; + } + const auto& cmsStr = statistic.GetData(); std::unique_ptr cms(TCountMinSketch::FromString( cmsStr.data(), cmsStr.size())); diff --git a/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp b/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp index 6835654f6e8f..4fdd825efc6e 100644 --- a/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp +++ b/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp @@ -68,13 +68,22 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvDeleteStatisticsQueryRespon } void TStatisticsAggregator::Handle(TEvStatistics::TEvFinishTraversal::TPtr& ev) { using EStatus = TEvStatistics::TEvFinishTraversal::EStatus; - if (ev->Get()->Status == EStatus::TableNotFound) { + switch (ev->Get()->Status) { + case EStatus::Success: + std::move( + ev->Get()->Statistics.begin(), ev->Get()->Statistics.end(), + std::back_inserter(StatisticsToSave)); + SaveStatisticsToTable(); + return; + case EStatus::TableNotFound: DeleteStatisticsFromTable(); return; + case EStatus::InternalError: + Execute( + new TTxFinishTraversal(this, false), + TActivationContext::AsActorContext()); + return; } - Execute( - new TTxFinishTraversal(this, ev->Get()->Status == EStatus::Success), - TActivationContext::AsActorContext()); } } // NKikimr::NStat diff --git a/ydb/core/statistics/database/database.cpp b/ydb/core/statistics/database/database.cpp index 74201e1f24a9..56b012bf57b0 100644 --- a/ydb/core/statistics/database/database.cpp +++ b/ydb/core/statistics/database/database.cpp @@ -81,41 +81,41 @@ NActors::IActor* CreateStatisticsTableCreator(std::unique_ptr ColumnTags; - const std::vector Data; + const std::vector Items; public: - TSaveStatisticsQuery(const TString& database, const TPathId& pathId, ui64 statType, - const std::vector& columnTags, const std::vector& data) + TSaveStatisticsQuery( + const TString& database, const TPathId& pathId, std::vector items) : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, database, true) , PathId(pathId) - , StatType(statType) - , ColumnTags(columnTags) - , Data(data) - { - Y_ABORT_UNLESS(ColumnTags.size() == Data.size()); - } + , Items(std::move(items)) + {} void OnRunQuery() override { TStringBuilder sql; sql << R"( DECLARE $owner_id AS Uint64; DECLARE $local_path_id AS Uint64; - DECLARE $stat_type AS Uint32; - DECLARE $column_tags AS List; + DECLARE $stat_types AS List; + DECLARE $column_tags AS List>; DECLARE $data AS List; + $to_struct = ($t) -> { + RETURN <| + owner_id:$owner_id, + local_path_id:$local_path_id, + stat_type:$t.0, + column_tag:$t.1, + data:$t.2, + |>; + }; + UPSERT INTO `.metadata/_statistics` (owner_id, local_path_id, stat_type, column_tag, data) - VALUES + SELECT owner_id, local_path_id, stat_type, column_tag, data FROM + AS_TABLE(ListMap(ListZip($stat_types, $column_tags, $data), $to_struct)); )"; - for (size_t i = 0; i < Data.size(); ++i) { - sql << " ($owner_id, $local_path_id, $stat_type, $column_tags[" << i << "], $data[" << i << "])"; - sql << (i == Data.size() - 1 ? ";" : ","); - } - NYdb::TParamsBuilder params; params .AddParam("$owner_id") @@ -123,22 +123,29 @@ class TSaveStatisticsQuery : public NKikimr::TQueryBase { .Build() .AddParam("$local_path_id") .Uint64(PathId.LocalPathId) - .Build() - .AddParam("$stat_type") - .Uint32(StatType) .Build(); + + auto& statTypes = params.AddParam("$stat_types").BeginList(); + for (const auto& item : Items) { + statTypes + .AddListItem() + .Uint32(item.Type); + } + statTypes.EndList().Build(); + auto& columnTags = params.AddParam("$column_tags").BeginList(); - for (size_t i = 0; i < ColumnTags.size(); ++i) { + for (const auto& item : Items) { columnTags .AddListItem() - .Uint32(ColumnTags[i]); + .OptionalUint32(item.ColumnTag); } columnTags.EndList().Build(); + auto& data = params.AddParam("$data").BeginList(); - for (size_t i = 0; i < Data.size(); ++i) { + for (const auto& item : Items) { data .AddListItem() - .String(Data[i]); + .String(item.Data); } data.EndList().Build(); @@ -162,23 +169,19 @@ class TSaveStatisticsRetryingQuery : public TActorBootstrapped ColumnTags; - const std::vector Data; + const std::vector Items; public: using TSaveRetryingQuery = TQueryRetryActor< TSaveStatisticsQuery, TEvStatistics::TEvSaveStatisticsQueryResponse, - const TString&, const TPathId&, ui64, const std::vector&, const std::vector&>; + const TString&, const TPathId&, const std::vector&>; TSaveStatisticsRetryingQuery(const NActors::TActorId& replyActorId, const TString& database, - const TPathId& pathId, ui64 statType, std::vector&& columnTags, std::vector&& data) + const TPathId& pathId, std::vector&& items) : ReplyActorId(replyActorId) , Database(database) , PathId(pathId) - , StatType(statType) - , ColumnTags(std::move(columnTags)) - , Data(std::move(data)) + , Items(std::move(items)) {} void Bootstrap() { @@ -188,7 +191,7 @@ class TSaveStatisticsRetryingQuery : public TActorBootstrapped::max(), TDuration::Seconds(1)), - Database, PathId, StatType, ColumnTags, Data + Database, PathId, std::move(Items) )); Become(&TSaveStatisticsRetryingQuery::StateFunc); } @@ -204,9 +207,9 @@ class TSaveStatisticsRetryingQuery : public TActorBootstrapped&& columnTags, std::vector&& data) + const TPathId& pathId, std::vector&& items) { - return new TSaveStatisticsRetryingQuery(replyActorId, database, pathId, statType, std::move(columnTags), std::move(data)); + return new TSaveStatisticsRetryingQuery(replyActorId, database, pathId, std::move(items)); } diff --git a/ydb/core/statistics/database/database.h b/ydb/core/statistics/database/database.h index 7e1ac275a6b9..242366ef09df 100644 --- a/ydb/core/statistics/database/database.h +++ b/ydb/core/statistics/database/database.h @@ -1,6 +1,8 @@ #pragma once #include +#include +#include #include namespace NKikimr::NStat { @@ -8,7 +10,7 @@ namespace NKikimr::NStat { NActors::IActor* CreateStatisticsTableCreator(std::unique_ptr event, const TString& database); NActors::IActor* CreateSaveStatisticsQuery(const NActors::TActorId& replyActorId, const TString& database, - const TPathId& pathId, ui64 statType, std::vector&& columnTags, std::vector&& data); + const TPathId& pathId, std::vector&& items); NActors::IActor* CreateLoadStatisticsQuery(const NActors::TActorId& replyActorId, const TString& database, const TPathId& pathId, ui64 statType, ui32 columnTag); diff --git a/ydb/core/statistics/database/ut/ut_database.cpp b/ydb/core/statistics/database/ut/ut_database.cpp index 31223497bca0..ab2e76f193a5 100644 --- a/ydb/core/statistics/database/ut/ut_database.cpp +++ b/ydb/core/statistics/database/ut/ut_database.cpp @@ -23,12 +23,13 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { runtime.GrabEdgeEventRethrow(sender); TPathId pathId(1, 1); - ui64 statType = 1; - std::vector columnTags = {1, 2}; - std::vector data = {"dataA", "dataB"}; + EStatType statType = EStatType::COUNT_MIN_SKETCH; + std::vector statItems; + statItems.emplace_back(1, statType, "dataA"); + statItems.emplace_back(2, statType, "dataB"); runtime.Register(CreateSaveStatisticsQuery(sender, "/Root/Database", - pathId, statType, std::move(columnTags), std::move(data)), + pathId, std::move(statItems)), 0, 0, TMailboxType::Simple, 0, sender); auto saveResponse = runtime.GrabEdgeEventRethrow(sender); UNIT_ASSERT(saveResponse->Get()->Success); @@ -61,12 +62,13 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { runtime.GrabEdgeEvent(sender); TPathId pathId(1, 1); - ui64 statType = 1; - std::vector columnTags = {1, 2}; - std::vector data = {"dataA", "dataB"}; + EStatType statType = EStatType::COUNT_MIN_SKETCH; + std::vector statItems; + statItems.emplace_back(1, statType, "dataA"); + statItems.emplace_back(2, statType, "dataB"); runtime.Register(CreateSaveStatisticsQuery(sender, "/Root/Database", - pathId, statType, std::move(columnTags), std::move(data)), + pathId, std::move(statItems)), 0, 0, TMailboxType::Simple, 0, sender); auto saveResponse = runtime.GrabEdgeEvent(sender); UNIT_ASSERT(saveResponse->Get()->Success); diff --git a/ydb/core/statistics/events.h b/ydb/core/statistics/events.h index b03a23b72b54..04e19c5ed4b6 100644 --- a/ydb/core/statistics/events.h +++ b/ydb/core/statistics/events.h @@ -44,6 +44,22 @@ struct TResponse { TStatCountMinSketch CountMinSketch; }; +// A single item of columnar statistics ready to be saved in the internal table. +struct TStatisticsItem { + TStatisticsItem( + std::optional columnTag, + EStatType type, + TString data) + : ColumnTag(columnTag) + , Type(type) + , Data(std::move(data)) + {} + + std::optional ColumnTag; + EStatType Type; + TString Data; +}; + struct TEvStatistics { enum EEv { EvGetStatistics = EventSpaceBegin(TKikimrEvents::ES_STATISTICS), @@ -283,6 +299,12 @@ struct TEvStatistics { TableNotFound, }; EStatus Status; + std::vector Statistics; + + explicit TEvFinishTraversal(std::vector statistics) + : Status(EStatus::Success) + , Statistics(std::move(statistics)) + {} explicit TEvFinishTraversal(EStatus status) : Status(status) {} }; From 51be969363fbce2aecac8254941685b7e122380b Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 1 Dec 2025 19:47:28 +0300 Subject: [PATCH 02/13] add proto msg for simple column statistics --- ydb/core/protos/statistics.proto | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ydb/core/protos/statistics.proto b/ydb/core/protos/statistics.proto index 5e1f2cf9cf18..bb8d3e058308 100644 --- a/ydb/core/protos/statistics.proto +++ b/ydb/core/protos/statistics.proto @@ -198,3 +198,9 @@ message TEvAggregateStatisticsResponse { } repeated TFailedTablet FailedTablets = 3; } + +message TSimpleColumnStatistics { + optional uint64 Count = 1; + optional uint64 CountNonNull = 2; + optional uint64 CountDistinct = 3; +}; From 9baf8ae85b4e5caa8ab9d72dc8c2b3af0b7455d7 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 1 Dec 2025 19:59:38 +0300 Subject: [PATCH 03/13] calculate simple column statistics (count distinct) --- .../statistics/aggregator/analyze_actor.cpp | 79 +++++++++++++------ .../statistics/aggregator/analyze_actor.h | 15 +++- ydb/core/statistics/aggregator/ut/ya.make | 2 + ydb/core/statistics/events.h | 2 +- ydb/core/statistics/service/ut/ya.make | 2 + 5 files changed, 69 insertions(+), 31 deletions(-) diff --git a/ydb/core/statistics/aggregator/analyze_actor.cpp b/ydb/core/statistics/aggregator/analyze_actor.cpp index a4014b3c64fe..971250dd38b2 100644 --- a/ydb/core/statistics/aggregator/analyze_actor.cpp +++ b/ydb/core/statistics/aggregator/analyze_actor.cpp @@ -215,7 +215,6 @@ void TAnalyzeActor::FinishWithFailure(TEvStatistics::TEvFinishTraversal::EStatus PassAway(); } - void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { const auto& request = *ev->Get()->Request; Y_ABORT_UNLESS(request.ResultSet.size() == 1); @@ -245,7 +244,16 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& TSelectBuilder builder; // TODO: many statistics types - builder.AddBuiltinAggregation({}, "count"); + CountSeq = builder.AddBuiltinAggregation({}, "count"); + + auto addColumn = [&](ui32 tag, const TString& name) { + Columns.emplace_back(tag); + // TODO: escape column names + Columns.back().CountDistinctSeq = builder.AddBuiltinAggregation( + name, "HLL"); + Columns.back().CmsSeq = builder.AddUDAFAggregation( + name, "CountMinSketch", CMS_WIDTH, CMS_DEPTH); + }; if (!ColumnTags.empty()) { for (const auto& colTag : ColumnTags) { auto colIt = columnNames.find(colTag); @@ -253,17 +261,11 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& // Column probably already deleted, skip it. continue; } - - // TODO: escape column names - Columns.emplace_back( - colTag, - builder.AddUDAFAggregation(colIt->second, "CountMinSketch", CMS_WIDTH, CMS_DEPTH)); + addColumn(colTag, colIt->second); } } else { for (const auto& [tag, name] : columnNames) { - Columns.emplace_back( - tag, - builder.AddUDAFAggregation(name, "CountMinSketch", CMS_WIDTH, CMS_DEPTH)); + addColumn(tag, name); } } @@ -273,6 +275,35 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& Register(actor.release()); } +TString TAnalyzeActor::TColumnDesc::ExtractSimpleStats( + ui64 count, const TVector& aggColumns) const { + NKikimrStat::TSimpleColumnStatistics result; + result.SetCount(count); + + NYdb::TValueParser hllVal(aggColumns.at(CountDistinctSeq.value())); + ui64 countDistinct = hllVal.GetOptionalUint64().value_or(0); + result.SetCountDistinct(countDistinct); + + return result.SerializeAsString(); +} + +TString TAnalyzeActor::TColumnDesc::ExtractCMS(const TVector& aggColumns) const { + TString data; + NYdb::TValueParser val(aggColumns.at(CmsSeq.value())); + val.OpenOptional(); + if (!val.IsNull()) { + const auto& bytes = val.GetBytes(); + data.assign(bytes.data(), bytes.size()); + } else { + auto defaultVal = std::unique_ptr( + TCountMinSketch::Create(CMS_WIDTH, CMS_DEPTH)); + auto bytes = defaultVal->AsStringBuf(); + data.assign(bytes.data(), bytes.size()); + } + return data; +} + + void TAnalyzeActor::Handle(TEvPrivate::TEvAnalyzeScanResult::TPtr& ev) { const auto& result = *ev->Get(); if (result.Status != Ydb::StatusIds::SUCCESS) { @@ -281,25 +312,21 @@ void TAnalyzeActor::Handle(TEvPrivate::TEvAnalyzeScanResult::TPtr& ev) { } std::vector items; - for (const auto& col : Columns) { - TString data; - - NYdb::TValueParser val(result.AggColumns.at(col.Seq)); - val.OpenOptional(); - if (!val.IsNull()) { - const auto& bytes = val.GetBytes(); - data.assign(bytes.data(), bytes.size()); - } else { - auto defaultVal = std::unique_ptr( - TCountMinSketch::Create(CMS_WIDTH, CMS_DEPTH)); - auto bytes = defaultVal->AsStringBuf(); - data.assign(bytes.data(), bytes.size()); - } + NYdb::TValueParser val(result.AggColumns.at(CountSeq.value())); + ui64 rowCount = val.GetUint64(); + for (const auto& col : Columns) { items.emplace_back( col.Tag, - NKikimr::NStat::COUNT_MIN_SKETCH, - data); + NKikimr::NStat::SIMPLE_COLUMN, + col.ExtractSimpleStats(rowCount, result.AggColumns)); + + if (col.CmsSeq) { + items.emplace_back( + col.Tag, + NKikimr::NStat::COUNT_MIN_SKETCH, + col.ExtractCMS(result.AggColumns)); + } } auto response = std::make_unique(std::move(items)); diff --git a/ydb/core/statistics/aggregator/analyze_actor.h b/ydb/core/statistics/aggregator/analyze_actor.h index 2480f6393311..3c363bf3f5a7 100644 --- a/ydb/core/statistics/aggregator/analyze_actor.h +++ b/ydb/core/statistics/aggregator/analyze_actor.h @@ -23,13 +23,20 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { // StateQuery struct TColumnDesc { - explicit TColumnDesc(ui32 tag, ui32 seq) - : Tag(tag), Seq(seq) + ui32 Tag; + std::optional CountDistinctSeq; + std::optional CmsSeq; + + explicit TColumnDesc(ui32 tag) + : Tag(tag) {} - ui32 Tag; - ui32 Seq; + TString ExtractSimpleStats(ui64 count, const TVector& aggColumns) const; + // Precondition: CmsSeq is non-null. + TString ExtractCMS(const TVector& aggColumns) const; }; + + std::optional CountSeq; TVector Columns; struct TEvPrivate { diff --git a/ydb/core/statistics/aggregator/ut/ya.make b/ydb/core/statistics/aggregator/ut/ya.make index f78288a21da6..10148e239b49 100644 --- a/ydb/core/statistics/aggregator/ut/ya.make +++ b/ydb/core/statistics/aggregator/ut/ya.make @@ -19,6 +19,8 @@ PEERDIR( ydb/core/protos ydb/core/testlib/default ydb/core/statistics/ut_common + yql/essentials/udfs/common/digest + yql/essentials/udfs/common/hyperloglog ) SRCS( diff --git a/ydb/core/statistics/events.h b/ydb/core/statistics/events.h index 04e19c5ed4b6..28be1c3bbebe 100644 --- a/ydb/core/statistics/events.h +++ b/ydb/core/statistics/events.h @@ -27,7 +27,7 @@ struct TStatCountMinSketch { enum EStatType { SIMPLE = 0, - HYPER_LOG_LOG = 1, + SIMPLE_COLUMN = 1, COUNT_MIN_SKETCH = 2, }; diff --git a/ydb/core/statistics/service/ut/ya.make b/ydb/core/statistics/service/ut/ya.make index 4a59fbd25d28..a8773bd19fcc 100644 --- a/ydb/core/statistics/service/ut/ya.make +++ b/ydb/core/statistics/service/ut/ya.make @@ -19,6 +19,8 @@ PEERDIR( ydb/core/protos ydb/core/testlib/default ydb/core/statistics/ut_common + yql/essentials/udfs/common/digest + yql/essentials/udfs/common/hyperloglog ) SRCS( From 6355776d3f27840d9495ca7fe754fa435b805f97 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 2 Dec 2025 19:30:27 +0300 Subject: [PATCH 04/13] save column type information --- .../statistics/aggregator/analyze_actor.cpp | 22 +++++++++---------- .../statistics/aggregator/analyze_actor.h | 4 +++- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/ydb/core/statistics/aggregator/analyze_actor.cpp b/ydb/core/statistics/aggregator/analyze_actor.cpp index 971250dd38b2..b8bdc5ee12e1 100644 --- a/ydb/core/statistics/aggregator/analyze_actor.cpp +++ b/ydb/core/statistics/aggregator/analyze_actor.cpp @@ -236,9 +236,9 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& // TODO: escape table path auto table = "/" + JoinVectorIntoString(entry.Path, "/"); - THashMap columnNames; + THashMap tag2Column; for (const auto& col : entry.Columns) { - columnNames[col.second.Id] = col.second.Name; + tag2Column[col.second.Id] = col.second; } TSelectBuilder builder; @@ -246,26 +246,26 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& // TODO: many statistics types CountSeq = builder.AddBuiltinAggregation({}, "count"); - auto addColumn = [&](ui32 tag, const TString& name) { - Columns.emplace_back(tag); + auto addColumn = [&](const TSysTables::TTableColumnInfo& colInfo) { + Columns.emplace_back(colInfo.Id, colInfo.PType); // TODO: escape column names Columns.back().CountDistinctSeq = builder.AddBuiltinAggregation( - name, "HLL"); + colInfo.Name, "HLL"); Columns.back().CmsSeq = builder.AddUDAFAggregation( - name, "CountMinSketch", CMS_WIDTH, CMS_DEPTH); + colInfo.Name, "CountMinSketch", CMS_WIDTH, CMS_DEPTH); }; if (!ColumnTags.empty()) { for (const auto& colTag : ColumnTags) { - auto colIt = columnNames.find(colTag); - if (colIt == columnNames.end()) { + auto colIt = tag2Column.find(colTag); + if (colIt == tag2Column.end()) { // Column probably already deleted, skip it. continue; } - addColumn(colTag, colIt->second); + addColumn(colIt->second); } } else { - for (const auto& [tag, name] : columnNames) { - addColumn(tag, name); + for (const auto& [tag, info] : tag2Column) { + addColumn(info); } } diff --git a/ydb/core/statistics/aggregator/analyze_actor.h b/ydb/core/statistics/aggregator/analyze_actor.h index 3c363bf3f5a7..465e22c04673 100644 --- a/ydb/core/statistics/aggregator/analyze_actor.h +++ b/ydb/core/statistics/aggregator/analyze_actor.h @@ -24,11 +24,13 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { struct TColumnDesc { ui32 Tag; + NScheme::TTypeInfo Type; std::optional CountDistinctSeq; std::optional CmsSeq; - explicit TColumnDesc(ui32 tag) + explicit TColumnDesc(ui32 tag, NScheme::TTypeInfo type) : Tag(tag) + , Type(type) {} TString ExtractSimpleStats(ui64 count, const TVector& aggColumns) const; From 7970ffab14c5a244914796fd8ec6a8411cf62a7b Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 2 Dec 2025 15:56:50 +0300 Subject: [PATCH 05/13] 2-stage analyze with adaptive count-min sketch params --- .../statistics/aggregator/analyze_actor.cpp | 178 +++++++++++++----- .../statistics/aggregator/analyze_actor.h | 55 +++++- 2 files changed, 179 insertions(+), 54 deletions(-) diff --git a/ydb/core/statistics/aggregator/analyze_actor.cpp b/ydb/core/statistics/aggregator/analyze_actor.cpp index b8bdc5ee12e1..af8081a2cbd1 100644 --- a/ydb/core/statistics/aggregator/analyze_actor.cpp +++ b/ydb/core/statistics/aggregator/analyze_actor.cpp @@ -1,8 +1,10 @@ +#include #include #include #include #include +#include #include #include @@ -127,6 +129,78 @@ class TSelectBuilder { TVector Columns; }; +class TCMSEval : public IColumnStatisticEval { + ui64 Width; + ui64 Depth = DEFAULT_DEPTH; + std::optional Seq; + + static constexpr ui64 MIN_WIDTH = 256; + static constexpr ui64 DEFAULT_DEPTH = 8; + + TCMSEval(ui64 width) : Width(width) {} +public: + static std::optional MaybeCreate( + const NKikimrStat::TSimpleColumnStatistics& simpleStats, + const NScheme::TTypeInfo&) { + if (simpleStats.GetCountDistinct() >= 0.8 * simpleStats.GetCount()) { + return std::nullopt; + } + + const double n = simpleStats.GetCount(); + const double ndv = simpleStats.GetCountDistinct(); + if (ndv == 0) { + return TCMSEval(MIN_WIDTH); + } + + const double c = 10; + const double eps = (c - 1) * (1 + std::log10(n / ndv)) / ndv; + const ui64 cmsWidth = std::max((ui64)MIN_WIDTH, (ui64)ceil(std::numbers::e_v / eps)); + return TCMSEval(cmsWidth); + } + + EStatType GetType() const final { return EStatType::COUNT_MIN_SKETCH; } + + size_t EstimateSize() const final { return Width * Depth * sizeof(ui32); } + + void AddAggregations(const TString& columnName, TSelectBuilder& builder) final { + Seq = builder.AddUDAFAggregation(columnName, "CountMinSketch", Width, Depth); + } + + TString ExtractData(const TVector& aggColumns) const final { + NYdb::TValueParser val(aggColumns.at(Seq.value())); + val.OpenOptional(); + if (!val.IsNull()) { + const auto& bytes = val.GetBytes(); + return TString(bytes.data(), bytes.size()); + } else { + auto defaultVal = std::unique_ptr( + TCountMinSketch::Create(Width, Depth)); + auto bytes = defaultVal->AsStringBuf(); + return TString(bytes.data(), bytes.size()); + } + } +}; + +TVector IColumnStatisticEval::SupportedTypes() { + return { EStatType::COUNT_MIN_SKETCH }; +} + +IColumnStatisticEval::TPtr IColumnStatisticEval::MaybeCreate( + EStatType statType, + const NKikimrStat::TSimpleColumnStatistics& simpleStats, + const NScheme::TTypeInfo& columnType) { + switch (statType) { + case EStatType::COUNT_MIN_SKETCH: { + auto maybeEval = TCMSEval::MaybeCreate(simpleStats, columnType); + if (!maybeEval) { + return TPtr{}; + } + return std::make_unique(std::move(*maybeEval)); + } + default: + return TPtr{}; + } +} class TAnalyzeActor::TScanActor : public TQueryBase { public: @@ -190,9 +264,6 @@ class TAnalyzeActor::TScanActor : public TQueryBase { bool ResponseSent = false; }; -static constexpr ui64 CMS_WIDTH = 256; -static constexpr ui64 CMS_DEPTH = 8; - void TAnalyzeActor::Bootstrap() { Become(&TThis::StateNavigate); @@ -234,28 +305,25 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& } // TODO: escape table path - auto table = "/" + JoinVectorIntoString(entry.Path, "/"); + TableName = "/" + JoinVectorIntoString(entry.Path, "/"); THashMap tag2Column; for (const auto& col : entry.Columns) { tag2Column[col.second.Id] = col.second; } - TSelectBuilder builder; + TSelectBuilder stage1Builder; - // TODO: many statistics types - CountSeq = builder.AddBuiltinAggregation({}, "count"); + CountSeq = stage1Builder.AddBuiltinAggregation({}, "count"); auto addColumn = [&](const TSysTables::TTableColumnInfo& colInfo) { - Columns.emplace_back(colInfo.Id, colInfo.PType); + Columns.emplace_back(colInfo.Id, colInfo.PType, colInfo.Name); // TODO: escape column names - Columns.back().CountDistinctSeq = builder.AddBuiltinAggregation( + Columns.back().CountDistinctSeq = stage1Builder.AddBuiltinAggregation( colInfo.Name, "HLL"); - Columns.back().CmsSeq = builder.AddUDAFAggregation( - colInfo.Name, "CountMinSketch", CMS_WIDTH, CMS_DEPTH); }; - if (!ColumnTags.empty()) { - for (const auto& colTag : ColumnTags) { + if (!RequestedColumnTags.empty()) { + for (const auto& colTag : RequestedColumnTags) { auto colIt = tag2Column.find(colTag); if (colIt == tag2Column.end()) { // Column probably already deleted, skip it. @@ -269,13 +337,13 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& } } - Become(&TThis::StateQuery); + Become(&TThis::StateQueryStage1); auto actor = std::make_unique( - SelfId(), DatabaseName, builder.Build(table), builder.ColumnCount()); + SelfId(), DatabaseName, stage1Builder.Build(TableName), stage1Builder.ColumnCount()); Register(actor.release()); } -TString TAnalyzeActor::TColumnDesc::ExtractSimpleStats( +NKikimrStat::TSimpleColumnStatistics TAnalyzeActor::TColumnDesc::ExtractSimpleStats( ui64 count, const TVector& aggColumns) const { NKikimrStat::TSimpleColumnStatistics result; result.SetCount(count); @@ -284,52 +352,74 @@ TString TAnalyzeActor::TColumnDesc::ExtractSimpleStats( ui64 countDistinct = hllVal.GetOptionalUint64().value_or(0); result.SetCountDistinct(countDistinct); - return result.SerializeAsString(); + return result; } -TString TAnalyzeActor::TColumnDesc::ExtractCMS(const TVector& aggColumns) const { - TString data; - NYdb::TValueParser val(aggColumns.at(CmsSeq.value())); - val.OpenOptional(); - if (!val.IsNull()) { - const auto& bytes = val.GetBytes(); - data.assign(bytes.data(), bytes.size()); - } else { - auto defaultVal = std::unique_ptr( - TCountMinSketch::Create(CMS_WIDTH, CMS_DEPTH)); - auto bytes = defaultVal->AsStringBuf(); - data.assign(bytes.data(), bytes.size()); - } - return data; -} - - -void TAnalyzeActor::Handle(TEvPrivate::TEvAnalyzeScanResult::TPtr& ev) { +void TAnalyzeActor::HandleStage1(TEvPrivate::TEvAnalyzeScanResult::TPtr& ev) { const auto& result = *ev->Get(); if (result.Status != Ydb::StatusIds::SUCCESS) { FinishWithFailure(TEvStatistics::TEvFinishTraversal::EStatus::InternalError); return; } - std::vector items; NYdb::TValueParser val(result.AggColumns.at(CountSeq.value())); ui64 rowCount = val.GetUint64(); - for (const auto& col : Columns) { - items.emplace_back( + auto supportedStatTypes = IColumnStatisticEval::SupportedTypes(); + + TSelectBuilder stage2Builder; + for (auto& col : Columns) { + auto simpleStats = col.ExtractSimpleStats(rowCount, result.AggColumns); + Results.emplace_back( col.Tag, NKikimr::NStat::SIMPLE_COLUMN, - col.ExtractSimpleStats(rowCount, result.AggColumns)); + simpleStats.SerializeAsString()); + + for (auto type : supportedStatTypes) { + auto statEval = IColumnStatisticEval::MaybeCreate(type, simpleStats, col.Type); + if (!statEval) { + continue; + } + if (statEval->EstimateSize() >= 4_MB) { + // To avoid: Error: ydb/library/yql/dq/runtime/dq_output_channel.cpp:405: Row data size is too big: 53839241 bytes, exceeds limit of 50331648 bytes + continue; + } + statEval->AddAggregations(col.Name, stage2Builder); + col.Statistics.push_back(std::move(statEval)); + } + } + + if (stage2Builder.ColumnCount() == 0) { + // Second stage is not needed, return results right away. + auto response = std::make_unique(std::move(Results)); + Send(Parent, response.release()); + PassAway(); + return; + } - if (col.CmsSeq) { - items.emplace_back( + Become(&TThis::StateQueryStage2); + auto actor = std::make_unique( + SelfId(), DatabaseName, stage2Builder.Build(TableName), stage2Builder.ColumnCount()); + Register(actor.release()); +} + +void TAnalyzeActor::HandleStage2(TEvPrivate::TEvAnalyzeScanResult::TPtr& ev) { + const auto& result = *ev->Get(); + if (result.Status != Ydb::StatusIds::SUCCESS) { + FinishWithFailure(TEvStatistics::TEvFinishTraversal::EStatus::InternalError); + return; + } + + for (const auto& col : Columns) { + for (const auto& statEval : col.Statistics) { + Results.emplace_back( col.Tag, - NKikimr::NStat::COUNT_MIN_SKETCH, - col.ExtractCMS(result.AggColumns)); + statEval->GetType(), + statEval->ExtractData(result.AggColumns)); } } - auto response = std::make_unique(std::move(items)); + auto response = std::make_unique(std::move(Results)); Send(Parent, response.release()); PassAway(); } diff --git a/ydb/core/statistics/aggregator/analyze_actor.h b/ydb/core/statistics/aggregator/analyze_actor.h index 465e22c04673..8d78d66bbb51 100644 --- a/ydb/core/statistics/aggregator/analyze_actor.h +++ b/ydb/core/statistics/aggregator/analyze_actor.h @@ -7,12 +7,31 @@ namespace NKikimr::NStat { +class TSelectBuilder; + +class IColumnStatisticEval { +public: + using TPtr = std::unique_ptr; + + static TVector SupportedTypes(); + static TPtr MaybeCreate( + EStatType, + const NKikimrStat::TSimpleColumnStatistics&, + const NScheme::TTypeInfo&); + + virtual EStatType GetType() const = 0; + virtual size_t EstimateSize() const = 0; + virtual void AddAggregations(const TString& columnName, TSelectBuilder&) = 0; + virtual TString ExtractData(const TVector& aggColumns) const = 0; + virtual ~IColumnStatisticEval() = default; +}; + class TAnalyzeActor : public NActors::TActorBootstrapped { TActorId Parent; TString OperationId; TString DatabaseName; TPathId PathId; - TVector ColumnTags; + TVector RequestedColumnTags; void FinishWithFailure(TEvStatistics::TEvFinishTraversal::EStatus); @@ -25,21 +44,27 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { struct TColumnDesc { ui32 Tag; NScheme::TTypeInfo Type; + TString Name; + std::optional CountDistinctSeq; - std::optional CmsSeq; + TVector Statistics; - explicit TColumnDesc(ui32 tag, NScheme::TTypeInfo type) + explicit TColumnDesc(ui32 tag, NScheme::TTypeInfo type, TString name) : Tag(tag) , Type(type) + , Name(std::move(name)) {} + TColumnDesc(TColumnDesc&&) noexcept = default; + TColumnDesc& operator=(TColumnDesc&) noexcept = default; - TString ExtractSimpleStats(ui64 count, const TVector& aggColumns) const; - // Precondition: CmsSeq is non-null. - TString ExtractCMS(const TVector& aggColumns) const; + NKikimrStat::TSimpleColumnStatistics ExtractSimpleStats( + ui64 count, const TVector& aggColumns) const; }; + TString TableName; std::optional CountSeq; TVector Columns; + TVector Results; struct TEvPrivate { enum EEv { @@ -74,7 +99,8 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { class TScanActor; - void Handle(TEvPrivate::TEvAnalyzeScanResult::TPtr& ev); + void HandleStage1(TEvPrivate::TEvAnalyzeScanResult::TPtr& ev); + void HandleStage2(TEvPrivate::TEvAnalyzeScanResult::TPtr& ev); public: TAnalyzeActor( @@ -87,7 +113,7 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { , OperationId(std::move(operationId)) , DatabaseName(std::move(databaseName)) , PathId(std::move(pathId)) - , ColumnTags(std::move(columnTags)) + , RequestedColumnTags(std::move(columnTags)) {} void Bootstrap(); @@ -98,9 +124,18 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { } } - STFUNC(StateQuery) { + // Calculate simple column statistics (count, count distinct) and + // determine the parameters for heavy statistics. + STFUNC(StateQueryStage1) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPrivate::TEvAnalyzeScanResult, HandleStage1); + } + } + + // Calculate "heavy" statistics requested from stage 1. + STFUNC(StateQueryStage2) { switch (ev->GetTypeRewrite()) { - hFunc(TEvPrivate::TEvAnalyzeScanResult, Handle); + hFunc(TEvPrivate::TEvAnalyzeScanResult, HandleStage2); } } }; From e71469ed627f72778874a1f7e548dd653af98def Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 3 Dec 2025 17:24:43 +0300 Subject: [PATCH 06/13] move TSelectBuilder to a separate file --- .../statistics/aggregator/analyze_actor.cpp | 124 +----------------- .../statistics/aggregator/select_builder.cpp | 79 +++++++++++ .../statistics/aggregator/select_builder.h | 67 ++++++++++ ydb/core/statistics/aggregator/ya.make | 2 + 4 files changed, 151 insertions(+), 121 deletions(-) create mode 100644 ydb/core/statistics/aggregator/select_builder.cpp create mode 100644 ydb/core/statistics/aggregator/select_builder.h diff --git a/ydb/core/statistics/aggregator/analyze_actor.cpp b/ydb/core/statistics/aggregator/analyze_actor.cpp index af8081a2cbd1..ff804e857322 100644 --- a/ydb/core/statistics/aggregator/analyze_actor.cpp +++ b/ydb/core/statistics/aggregator/analyze_actor.cpp @@ -1,5 +1,5 @@ -#include -#include +#include "analyze_actor.h" +#include "select_builder.h" #include #include @@ -7,128 +7,10 @@ #include #include -#include +#include namespace NKikimr::NStat { -class TSelectBuilder { -public: - ui32 AddBuiltinAggregation(std::optional columnName, TString aggName) { - auto column = TAggColumn{ - .Seq = static_cast(Columns.size()), - .ColumnName = std::move(columnName), - .AggName = std::move(aggName), - }; - Columns.push_back(std::move(column)); - return Columns.back().Seq; - } - - template - ui32 AddUDAFAggregation(TString columnName, const TStringBuf& udafName, TArgs&&... params) { - auto factory = AddFactory(udafName); - - // TODO: parameters escaping/binding - TString paramsStr = Join(',', params...); - - auto column = TAggColumn{ - .Seq = static_cast(Columns.size()), - .ColumnName = std::move(columnName), - .UdafFactory = factory, - .Params = std::move(paramsStr), - }; - Columns.push_back(std::move(column)); - return Columns.back().Seq; - } - - TString Build(const TStringBuf& table) const { - TStringBuilder res; - for (const auto& [udaf, factory] : Udaf2Factory) { - TStringBuilder paramsStr; - for (size_t i = 0; i < factory.ParamCount; ++i) { - if (i > 0) { - paramsStr << ","; - } - paramsStr << "$p" << i; - } - - res << std::format(R"($f{0} = ({2}) -> {{ return AggregationFactory( - "UDAF", - ($item,$parent) -> {{ return Udf(StatisticsInternal::{1}Create, $parent as Depends)($item,{2}) }}, - ($state,$item,$parent) -> {{ return Udf(StatisticsInternal::{1}AddValue, $parent as Depends)($state, $item) }}, - StatisticsInternal::{1}Merge, - StatisticsInternal::{1}Finalize, - StatisticsInternal::{1}Serialize, - StatisticsInternal::{1}Deserialize, - ) -}}; -)", - factory.Id, std::string_view(factory.Udaf), std::string_view(paramsStr)); - } - - res << "SELECT "; - bool first = true; - for (const auto& agg : Columns ) { - if (first) { - first = false; - } else { - res << ","; - } - if (agg.UdafFactory) { - Y_ABORT_UNLESS(agg.ColumnName); - res << "AGGREGATE_BY(" << agg.ColumnName - << "," << "$f" << *agg.UdafFactory << "(" << agg.Params << "))"; - } else { - Y_ABORT_UNLESS(agg.AggName); - res << *agg.AggName; - if (agg.ColumnName) { - res << "(" << *agg.ColumnName << ")"; - } else { - res << "(*)"; - } - } - } - - res << " FROM `" << table << "`"; - return res; - } - - size_t ColumnCount() const { - return Columns.size(); - } - -private: - ui32 AddFactory(const TStringBuf& udafName) { - // TODO: check UDAF existence, determine paramcount - ui32 curId = Udaf2Factory.size(); - size_t paramCount = 2; - auto [it, emplaced] = Udaf2Factory.try_emplace(udafName, curId, udafName, paramCount); - return it->second.Id; - } - -private: - struct TFactory { - TFactory(ui32 id, const TStringBuf& udaf, size_t paramCount) - : Id(id), Udaf(udaf), ParamCount(paramCount) - {} - - ui32 Id = 0; - TString Udaf; - size_t ParamCount = 0; - }; - - THashMap Udaf2Factory; - - struct TAggColumn { - ui32 Seq = 0; - std::optional ColumnName; - std::optional AggName; - std::optional UdafFactory; - TString Params; - }; - - TVector Columns; -}; - class TCMSEval : public IColumnStatisticEval { ui64 Width; ui64 Depth = DEFAULT_DEPTH; diff --git a/ydb/core/statistics/aggregator/select_builder.cpp b/ydb/core/statistics/aggregator/select_builder.cpp new file mode 100644 index 000000000000..907d74e7474e --- /dev/null +++ b/ydb/core/statistics/aggregator/select_builder.cpp @@ -0,0 +1,79 @@ +#include "select_builder.h" + +#include + +#include + +namespace NKikimr::NStat { + +ui32 TSelectBuilder::AddBuiltinAggregation(std::optional columnName, TString aggName) { + auto column = TAggColumn{ + .Seq = static_cast(Columns.size()), + .ColumnName = std::move(columnName), + .AggName = std::move(aggName), + }; + Columns.push_back(std::move(column)); + return Columns.back().Seq; +} + +ui32 TSelectBuilder::AddFactory(const TStringBuf& udafName) { + // TODO: check UDAF existence, determine paramcount + ui32 curId = Udaf2Factory.size(); + size_t paramCount = 2; + auto [it, emplaced] = Udaf2Factory.try_emplace(udafName, curId, udafName, paramCount); + return it->second.Id; +} + +TString TSelectBuilder::Build(const TStringBuf& table) const { + TStringBuilder res; + for (const auto& [udaf, factory] : Udaf2Factory) { + TStringBuilder paramsStr; + for (size_t i = 0; i < factory.ParamCount; ++i) { + if (i > 0) { + paramsStr << ","; + } + paramsStr << "$p" << i; + } + + res << std::format(R"($f{0} = ({2}) -> {{ return AggregationFactory( + "UDAF", + ($item,$parent) -> {{ return Udf(StatisticsInternal::{1}Create, $parent as Depends)($item,{2}) }}, + ($state,$item,$parent) -> {{ return Udf(StatisticsInternal::{1}AddValue, $parent as Depends)($state, $item) }}, + StatisticsInternal::{1}Merge, + StatisticsInternal::{1}Finalize, + StatisticsInternal::{1}Serialize, + StatisticsInternal::{1}Deserialize, +) +}}; +)", + factory.Id, std::string_view(factory.Udaf), std::string_view(paramsStr)); + } + + res << "SELECT "; + bool first = true; + for (const auto& agg : Columns ) { + if (first) { + first = false; + } else { + res << ","; + } + if (agg.UdafFactory) { + Y_ABORT_UNLESS(agg.ColumnName); + res << "AGGREGATE_BY(" << agg.ColumnName + << "," << "$f" << *agg.UdafFactory << "(" << agg.Params << "))"; + } else { + Y_ABORT_UNLESS(agg.AggName); + res << *agg.AggName; + if (agg.ColumnName) { + res << "(" << *agg.ColumnName << ")"; + } else { + res << "(*)"; + } + } + } + + res << " FROM `" << table << "`"; + return res; +} + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/select_builder.h b/ydb/core/statistics/aggregator/select_builder.h new file mode 100644 index 000000000000..d866963e9bfb --- /dev/null +++ b/ydb/core/statistics/aggregator/select_builder.h @@ -0,0 +1,67 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NStat { + +// Class that is used to build internal SELECT queries used to calculate column statistics. +class TSelectBuilder { +public: + ui32 AddBuiltinAggregation(std::optional columnName, TString aggName); + + template + ui32 AddUDAFAggregation(TString columnName, const TStringBuf& udafName, TArgs&&... params); + + TString Build(const TStringBuf& table) const; + + size_t ColumnCount() const { + return Columns.size(); + } + +private: + ui32 AddFactory(const TStringBuf& udafName); + +private: + struct TFactory { + TFactory(ui32 id, const TStringBuf& udaf, size_t paramCount) + : Id(id), Udaf(udaf), ParamCount(paramCount) + {} + + ui32 Id = 0; + TString Udaf; + size_t ParamCount = 0; + }; + + THashMap Udaf2Factory; + + struct TAggColumn { + ui32 Seq = 0; + std::optional ColumnName; + std::optional AggName; + std::optional UdafFactory; + TString Params; + }; + + TVector Columns; +}; + +template +ui32 TSelectBuilder::AddUDAFAggregation(TString columnName, const TStringBuf& udafName, TArgs&&... params) { + auto factory = AddFactory(udafName); + + // TODO: parameters escaping/binding + TString paramsStr = Join(',', params...); + + auto column = TAggColumn{ + .Seq = static_cast(Columns.size()), + .ColumnName = std::move(columnName), + .UdafFactory = factory, + .Params = std::move(paramsStr), + }; + Columns.push_back(std::move(column)); + return Columns.back().Seq; +} + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/ya.make b/ydb/core/statistics/aggregator/ya.make index 5bc717aab832..6904efe2a35f 100644 --- a/ydb/core/statistics/aggregator/ya.make +++ b/ydb/core/statistics/aggregator/ya.make @@ -9,6 +9,8 @@ SRCS( analyze_actor.cpp schema.h schema.cpp + select_builder.h + select_builder.cpp tx_ack_timeout.cpp tx_aggr_stat_response.cpp tx_analyze.cpp From f6bfb00b6fdf62042d5a32d9a5f9bc3ba34a4648 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 3 Dec 2025 17:51:49 +0300 Subject: [PATCH 07/13] add clarifying comment --- ydb/core/statistics/aggregator/aggregator_impl.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index 66c58f726700..c2a4c36134fc 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -675,6 +675,8 @@ void TStatisticsAggregator::ScheduleNextAnalyze(NIceDb::TNiceDb& db, const TActo UpdateForceTraversalTableStatus( TForceTraversalTable::EStatus::AnalyzeStarted, operation.OperationId, operationTable, db); + // operation.Types field is not used, TAnalyzeActor will determine suitable + // statistic types itself. ctx.RegisterWithSameMailbox(new TAnalyzeActor( SelfId(), operation.OperationId, operation.DatabaseName, operationTable.PathId, operationTable.ColumnTags)); From 1f8c255cee8f442f8c6598c361c6e43fd6478d3c Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 4 Dec 2025 15:14:33 +0300 Subject: [PATCH 08/13] change Value column type to String --- ydb/core/statistics/service/ut/ut_column_statistics.cpp | 4 ++-- ydb/core/statistics/service/ut/ut_http_request.cpp | 2 +- ydb/core/statistics/ut_common/ut_common.cpp | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ydb/core/statistics/service/ut/ut_column_statistics.cpp b/ydb/core/statistics/service/ut/ut_column_statistics.cpp index 005fc78b848d..ffa53204e6d5 100644 --- a/ydb/core/statistics/service/ut/ut_column_statistics.cpp +++ b/ydb/core/statistics/service/ut/ut_column_statistics.cpp @@ -74,7 +74,7 @@ Y_UNIT_TEST_SUITE(ColumnStatistics) { std::vector expected = { { - .Tag = 1, + .Tag = 1, // Key column .Probes{ {1, 4}, {2, 4} } } }; @@ -104,7 +104,7 @@ Y_UNIT_TEST_SUITE(ColumnStatistics) { auto sender = runtime.AllocateEdgeActor(); std::vector expected = { { - .Tag = 1, + .Tag = 1, // Key column .Probes{ {1, 4}, {2, 4} } } }; diff --git a/ydb/core/statistics/service/ut/ut_http_request.cpp b/ydb/core/statistics/service/ut/ut_http_request.cpp index 8c0b2e714094..a2ea215d6836 100644 --- a/ydb/core/statistics/service/ut/ut_http_request.cpp +++ b/ydb/core/statistics/service/ut/ut_http_request.cpp @@ -62,7 +62,7 @@ void ProbeTest(bool isServerless) { runtime.Register(new THttpRequest(THttpRequest::ERequestType::PROBE_COUNT_MIN_SKETCH, { { THttpRequest::EParamType::PATH, tableInfo.Path }, { THttpRequest::EParamType::COLUMN_NAME, columnName }, - { THttpRequest::EParamType::CELL_VALUE, "1" } + { THttpRequest::EParamType::CELL_VALUE, "\"1\"" } }, THttpRequest::EResponseContentType::HTML, sender)); diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index 5173bf8a8bee..20de0c71668a 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -277,7 +277,7 @@ void CreateColumnTable(TTestEnv& env, const TString& databaseName, const TString ExecuteYqlScript(env, Sprintf(R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, - Value Uint64, + Value String, PRIMARY KEY (Key) ) PARTITION BY HASH(Key) @@ -332,13 +332,13 @@ void PrepareColumnTable(TTestEnv& env, const TString& databaseName, const TStrin reqKeyType->mutable_type()->set_type_id(Ydb::Type::UINT64); auto* reqValueType = reqRowType->add_members(); reqValueType->set_name("Value"); - reqValueType->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64); + reqValueType->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); auto* reqRows = rows->mutable_value(); for (size_t j = 0; j < rowsInBlock && i < ColumnTableRowsNumber; ++i, ++j) { auto* row = reqRows->add_items(); row->add_items()->set_uint64_value(i); - row->add_items()->set_uint64_value(i); + row->add_items()->set_bytes_value(ToString(i)); } i -= overlap; auto future = NRpcService::DoLocalRpc( From 026201cae0e517372ae79f7ff0fe6d29ef7adf21 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 4 Dec 2025 16:59:56 +0300 Subject: [PATCH 09/13] statistics: refactor test helpers --- .../aggregator/ut/ut_analyze_columnshard.cpp | 56 +++++------ .../aggregator/ut/ut_traverse_columnshard.cpp | 51 +++++----- .../statistics/service/ut/ut_http_request.cpp | 28 ++++-- ydb/core/statistics/ut_common/ut_common.cpp | 99 ++++++++++--------- ydb/core/statistics/ut_common/ut_common.h | 22 ++--- 5 files changed, 136 insertions(+), 120 deletions(-) diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp index 0c28950bddec..9d6271860683 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp @@ -10,12 +10,20 @@ namespace NKikimr { namespace NStat { +namespace { + +TTableInfo PrepareDatabaseAndTable(TTestEnv& env) { + CreateDatabase(env, "Database"); + return PrepareColumnTable(env, "Database", "Table", 1); +} + +} // namespace + Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeShard) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, 1); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env); AnalyzeShard(runtime, tableInfo.ShardIds[0], tableInfo.PathId); } @@ -23,8 +31,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(Analyze) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, 1); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env); Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId}); } @@ -33,19 +40,17 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database"); - CreateColumnTable(env, "Database", "Table", 4); + const auto tableInfo = CreateColumnTable(env, "Database", "Table", 4); - ui64 saTabletId = 0; - auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId); - - Analyze(runtime, saTabletId, {pathId}); + Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId}); } Y_UNIT_TEST(AnalyzeServerless) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto databaseInfo = PrepareServerlessDatabaseColumnTables(env, 1, 1); - const auto& tableInfo = databaseInfo.Tables[0]; + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Database", "/Root/Shared"); + auto tableInfo = PrepareColumnTable(env, "Database", "Table", 1); Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId}, "operationId", "/Root/Database"); } @@ -53,8 +58,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeAnalyzeOneColumnTableSpecificColumns) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, 1); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env); Analyze(runtime, tableInfo.SaTabletId, {{tableInfo.PathId, {1, 2}}}); } @@ -62,10 +66,12 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeTwoColumnTables) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto databaseInfo = PrepareDatabaseColumnTables(env, 2, 1); - const auto& tableInfos = databaseInfo.Tables; - Analyze(runtime, tableInfos[0].SaTabletId, {tableInfos[0].PathId, tableInfos[1].PathId}); + CreateDatabase(env, "Database"); + const auto table1 = PrepareColumnTable(env, "Database", "Table1", 1); + const auto table2 = PrepareColumnTable(env, "Database", "Table2", 1); + + Analyze(runtime, table1.SaTabletId, {table1.PathId, table2.PathId}); } Y_UNIT_TEST(AnalyzeStatus) { @@ -74,8 +80,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { auto sender = runtime.AllocateEdgeActor(); TBlockEvents block(runtime); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, 1); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env); const TString operationId = "operationId"; AnalyzeStatus(runtime, sender, tableInfo.SaTabletId, operationId, NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION); @@ -110,8 +115,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeSameOperationId) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, 1); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env); auto sender = runtime.AllocateEdgeActor(); const TString operationId = "operationId"; @@ -141,8 +145,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeMultiOperationId) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, 1); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env); auto sender = runtime.AllocateEdgeActor(); auto GetOperationId = [] (size_t i) { return TStringBuilder() << "operationId" << i; }; @@ -171,8 +174,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeRebootSa) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, 1); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env); auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -198,8 +200,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeRebootColumnShard) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, 1); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env); auto sender = runtime.AllocateEdgeActor(); TBlockEvents block(runtime); @@ -218,8 +219,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeDeadline) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, 1); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env); auto sender = runtime.AllocateEdgeActor(); TBlockEvents block(runtime); diff --git a/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp index b935724c5d68..a2135044ac74 100644 --- a/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp @@ -13,6 +13,8 @@ namespace NStat { namespace { +static const ui8 ShardCount = 4; + // TODO: check for arbitrary set of values of type T (including frequent duplicates) // numbers (1..N) were count as a sketch. Check sketch properties bool CheckCountMinSketch(const std::shared_ptr& sketch, const ui32 N) { @@ -38,16 +40,24 @@ TTestEnv CreateTestEnv() { }); } +TTableInfo PrepareDatabaseAndTableWithIndexes(TTestEnv& env) { + CreateDatabase(env, "Database"); + return PrepareColumnTableWithIndexes(env, "Database", "Table", ShardCount); } -Y_UNIT_TEST_SUITE(TraverseColumnShard) { - const ui8 ShardCount = 4; +TTableInfo PrepareServerlessDatabaseAndTableWithIndexes(TTestEnv& env) { + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Database", "/Root/Shared"); + return PrepareColumnTableWithIndexes(env, "Database", "Table", ShardCount); +} +} + +Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTable) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); WaitForSavedStatistics(runtime, tableInfo.PathId); @@ -59,8 +69,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseServerlessColumnTable) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - auto databaseInfo = PrepareServerlessDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareServerlessDatabaseAndTableWithIndexes(env); WaitForSavedStatistics(runtime, tableInfo.PathId); @@ -72,8 +81,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootColumnshard) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); auto sender = runtime.AllocateEdgeActor(); WaitForSavedStatistics(runtime, tableInfo.PathId); @@ -88,8 +96,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeResolve) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); auto sender = runtime.AllocateEdgeActor(); TBlockEvents block(runtime); @@ -114,8 +121,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeReqDistribution) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -135,8 +141,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeAggregate) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -156,8 +161,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeSave) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -177,8 +181,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootSaTabletInAggregate) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); auto sender = runtime.AllocateEdgeActor(); int observerCount = 0; @@ -199,8 +202,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableHiveDistributionZeroNodes) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); bool observerFirstExec = true; auto observer = runtime.AddObserver( @@ -245,8 +247,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableHiveDistributionAbsentNodes) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); bool observerFirstExec = true; auto observer = runtime.AddObserver( @@ -283,8 +284,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableAggrStatUnavailableNode) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); bool observerFirstExec = true; auto observer = runtime.AddObserver( @@ -321,8 +321,7 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableAggrStatNonLocalTablet) { TTestEnv env = CreateTestEnv(); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, ShardCount); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTableWithIndexes(env); bool observerFirstExec = true; auto observer = runtime.AddObserver( diff --git a/ydb/core/statistics/service/ut/ut_http_request.cpp b/ydb/core/statistics/service/ut/ut_http_request.cpp index a2ea215d6836..fcc1f18930fc 100644 --- a/ydb/core/statistics/service/ut/ut_http_request.cpp +++ b/ydb/core/statistics/service/ut/ut_http_request.cpp @@ -8,13 +8,22 @@ namespace NKikimr { namespace NStat { +namespace { + +TTableInfo PrepareDatabaseAndTable(TTestEnv& env, bool isServerless) { + if (isServerless) { + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Database", "/Root/Shared"); + } else { + CreateDatabase(env, "Database"); + } + return PrepareColumnTable(env, "Database", "Table", 10); +} + void AnalyzeTest(bool isServerless) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = isServerless - ? PrepareServerlessDatabaseColumnTables(env, 1, 10) - : PrepareDatabaseColumnTables(env, 1, 10); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env, isServerless); const auto sender = runtime.AllocateEdgeActor(); runtime.Register(new THttpRequest(THttpRequest::ERequestType::ANALYZE, { @@ -34,10 +43,8 @@ void AnalyzeTest(bool isServerless) { void ProbeTest(bool isServerless) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = isServerless - ? PrepareServerlessDatabaseColumnTables(env, 1, 10) - : PrepareDatabaseColumnTables(env, 1, 10); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env, isServerless); + TString columnName = "Value"; const auto sender = runtime.AllocateEdgeActor(); @@ -116,6 +123,8 @@ void ProbeBaseStatsTest(bool isServerless) { UNIT_ASSERT_VALUES_EQUAL(json["row_count"].GetIntegerSafe(), ColumnTableRowsNumber); } +} // namespace + Y_UNIT_TEST_SUITE(HttpRequest) { Y_UNIT_TEST(Analyze) { AnalyzeTest(false); @@ -128,8 +137,7 @@ Y_UNIT_TEST_SUITE(HttpRequest) { Y_UNIT_TEST(Status) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - const auto databaseInfo = PrepareDatabaseColumnTables(env, 1, 10); - const auto& tableInfo = databaseInfo.Tables[0]; + const auto tableInfo = PrepareDatabaseAndTable(env, /*isServerless=*/false); const auto sender = runtime.AllocateEdgeActor(); const auto operationId = TULIDGenerator().Next(TInstant::Now()).ToString(); diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index 20de0c71668a..50ec9c339b2f 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -268,7 +268,7 @@ void CreateUniformTable(TTestEnv& env, const TString& databaseName, const TStrin ExecuteYqlScript(env, replace); } -void CreateColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, +TTableInfo CreateColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount) { auto fullTableName = Sprintf("Root/%s/%s", databaseName.c_str(), tableName.c_str()); @@ -287,12 +287,63 @@ void CreateColumnTable(TTestEnv& env, const TString& databaseName, const TString ); )", fullTableName.c_str(), shardCount)); runtime.SimulateSleep(TDuration::Seconds(1)); + + TTableInfo tableInfo; + tableInfo.Path = Sprintf("/Root/%s/%s", databaseName.c_str(), tableName.c_str()); + tableInfo.ShardIds = GetColumnTableShards(runtime, runtime.AllocateEdgeActor(), tableInfo.Path); + tableInfo.PathId = ResolvePathId(runtime, tableInfo.Path, &tableInfo.DomainKey, &tableInfo.SaTabletId); + return tableInfo; +} + +void InsertDataIntoTable(TTestEnv& env, const TString& databaseName, const TString& tableName, size_t rowCount) { + auto fullTableName = Sprintf("Root/%s/%s", databaseName.c_str(), tableName.c_str()); + auto& runtime = *env.GetServer().GetRuntime(); + + using TEvBulkUpsertRequest = NGRpcService::TGrpcRequestOperationCall< + Ydb::Table::BulkUpsertRequest, + Ydb::Table::BulkUpsertResponse>; + + Ydb::Table::BulkUpsertRequest request; + request.set_table(fullTableName); + auto* rows = request.mutable_rows(); + auto* reqRowType = rows->mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type(); + auto* reqKeyType = reqRowType->add_members(); + reqKeyType->set_name("Key"); + reqKeyType->mutable_type()->set_type_id(Ydb::Type::UINT64); + auto* reqValueType = reqRowType->add_members(); + reqValueType->set_name("Value"); + reqValueType->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + + auto* reqRows = rows->mutable_value(); + + for (size_t i = 0; i < rowCount; ++i) { + auto* row = reqRows->add_items(); + row->add_items()->set_uint64_value(i); + row->add_items()->set_bytes_value(ToString(i)); + } + + auto future = NRpcService::DoLocalRpc( + std::move(request), "", "", runtime.GetActorSystem(0)); + auto response = runtime.WaitFuture(std::move(future)); + + UNIT_ASSERT(response.operation().ready()); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + env.GetController()->WaitActualization(TDuration::Seconds(1)); +} + + +TTableInfo PrepareColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, + int shardCount) +{ + auto info = CreateColumnTable(env, databaseName, tableName, shardCount); + InsertDataIntoTable(env, databaseName, tableName, ColumnTableRowsNumber); + return info; } -void PrepareColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, +TTableInfo PrepareColumnTableWithIndexes(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount) { - CreateColumnTable(env, databaseName, tableName, shardCount); + auto info = CreateColumnTable(env, databaseName, tableName, shardCount); auto fullTableName = Sprintf("Root/%s/%s", databaseName.c_str(), tableName.c_str()); auto& runtime = *env.GetServer().GetRuntime(); @@ -350,48 +401,8 @@ void PrepareColumnTable(TTestEnv& env, const TString& databaseName, const TStrin } env.GetController()->WaitActualization(TDuration::Seconds(1)); -} - -std::vector GatherColumnTablesInfo(TTestEnv& env, const TString& fullDbName, ui8 tableCount) { - auto& runtime = *env.GetServer().GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - std::vector ret; - for (ui8 tableId = 1; tableId <= tableCount; tableId++) { - TTableInfo tableInfo; - tableInfo.Path = Sprintf("%s/Table%u", fullDbName.c_str(), tableId); - tableInfo.ShardIds = GetColumnTableShards(runtime, sender, tableInfo.Path); - tableInfo.PathId = ResolvePathId(runtime, tableInfo.Path, &tableInfo.DomainKey, &tableInfo.SaTabletId); - ret.emplace_back(tableInfo); - } - return ret; -} - -TDatabaseInfo PrepareDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { - auto fullDbName = CreateDatabase(env, "Database"); - - for (ui8 tableId = 1; tableId <= tableCount; tableId++) { - PrepareColumnTable(env, "Database", Sprintf("Table%u", tableId), shardCount); - } - - return { - .FullDatabaseName = fullDbName, - .Tables = GatherColumnTablesInfo(env, fullDbName, tableCount) - }; -} - -TDatabaseInfo PrepareServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { - auto fullServerlessDbName = CreateDatabase(env, "Shared", 1, true); - auto fullDbName = CreateServerlessDatabase(env, "Database", "/Root/Shared"); - - for (ui8 tableId = 1; tableId <= tableCount; tableId++) { - PrepareColumnTable(env, "Database", Sprintf("Table%u", tableId), shardCount); - } - return { - .FullDatabaseName = fullServerlessDbName, - .Tables = GatherColumnTablesInfo(env, fullDbName, tableCount) - }; + return info; } void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { diff --git a/ydb/core/statistics/ut_common/ut_common.h b/ydb/core/statistics/ut_common/ut_common.h index 9fc56d3c6cd7..e3ac44f88161 100644 --- a/ydb/core/statistics/ut_common/ut_common.h +++ b/ydb/core/statistics/ut_common/ut_common.h @@ -71,12 +71,6 @@ TString CreateDatabase(TTestEnv& env, const TString& databaseName, TString CreateServerlessDatabase(TTestEnv& env, const TString& databaseName, const TString& sharedName, size_t nodeCount = 0); -// Create empty column table with the requested number of shards. -void CreateColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount); -// Create a column table, enable count-min-sketch column statistics, -// and insert ColumnTableRowsNumber rows. -void PrepareColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount); - struct TTableInfo { std::vector ShardIds; ui64 SaTabletId; @@ -85,13 +79,17 @@ struct TTableInfo { TString Path; }; -struct TDatabaseInfo { - TString FullDatabaseName; - std::vector Tables; -}; +// Create empty column table with the requested number of shards. +TTableInfo CreateColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount); + +void InsertDataIntoTable(TTestEnv& env, const TString& databaseName, const TString& tableName, size_t rowCount); + +// Create a column table and insert ColumnTableRowsNumber rows. +TTableInfo PrepareColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount); -TDatabaseInfo PrepareDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount); -TDatabaseInfo PrepareServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount); +// Create a column table, enable count-min-sketch column indexes, +// and insert ColumnTableRowsNumber rows with some overlap to trigger compaction. +TTableInfo PrepareColumnTableWithIndexes(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount); TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey = nullptr, ui64* saTabletId = nullptr); From 0b48dcc46abfbee11ab28c5c94fae568ae242ca5 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 4 Dec 2025 18:58:16 +0300 Subject: [PATCH 10/13] tests: split CreateUniformTable into two functions --- .../aggregator/ut/ut_analyze_datashard.cpp | 8 ++++---- .../aggregator/ut/ut_traverse_datashard.cpp | 16 ++++++++-------- ydb/core/statistics/database/ut/ut_database.cpp | 2 +- .../service/ut/ut_basic_statistics.cpp | 2 +- ydb/core/statistics/ut_common/ut_common.cpp | 12 ++++-------- ydb/core/statistics/ut_common/ut_common.h | 5 ++++- 6 files changed, 22 insertions(+), 23 deletions(-) diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp index 4fb32316691e..a6c6ffac3b18 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp @@ -18,7 +18,7 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); + PrepareUniformTable(env, "Database", "Table"); ui64 saTabletId; auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId); @@ -34,8 +34,8 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table1"); - CreateUniformTable(env, "Database", "Table2"); + PrepareUniformTable(env, "Database", "Table1"); + PrepareUniformTable(env, "Database", "Table2"); ui64 saTabletId1; auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1", nullptr, &saTabletId1); @@ -53,7 +53,7 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); + PrepareUniformTable(env, "Database", "Table"); ui64 saTabletId = 0; auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId); diff --git a/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp b/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp index 4daf1336bd3a..102f31c3fee3 100644 --- a/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp @@ -28,7 +28,7 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); + PrepareUniformTable(env, "Database", "Table"); auto pathId = ResolvePathId(runtime, "/Root/Database/Table"); ValidateCountMinAbsence(runtime, pathId); @@ -39,8 +39,8 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table1"); - CreateUniformTable(env, "Database", "Table2"); + PrepareUniformTable(env, "Database", "Table1"); + PrepareUniformTable(env, "Database", "Table2"); auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Database/Table2"); @@ -54,7 +54,7 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { CreateDatabase(env, "Shared", 1, true); CreateServerlessDatabase(env, "Serverless", "/Root/Shared"); - CreateUniformTable(env, "Serverless", "Table"); + PrepareUniformTable(env, "Serverless", "Table"); auto pathId = ResolvePathId(runtime, "/Root/Serverless/Table"); ValidateCountMinAbsence(runtime, pathId); @@ -66,8 +66,8 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { CreateDatabase(env, "Shared", 1, true); CreateServerlessDatabase(env, "Serverless", "/Root/Shared"); - CreateUniformTable(env, "Serverless", "Table1"); - CreateUniformTable(env, "Serverless", "Table2"); + PrepareUniformTable(env, "Serverless", "Table1"); + PrepareUniformTable(env, "Serverless", "Table2"); auto pathId1 = ResolvePathId(runtime, "/Root/Serverless/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Serverless/Table2"); @@ -82,8 +82,8 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { CreateDatabase(env, "Shared", 1, true); CreateServerlessDatabase(env, "Serverless1", "/Root/Shared"); CreateServerlessDatabase(env, "Serverless2", "/Root/Shared"); - CreateUniformTable(env, "Serverless1", "Table1"); - CreateUniformTable(env, "Serverless2", "Table2"); + PrepareUniformTable(env, "Serverless1", "Table1"); + PrepareUniformTable(env, "Serverless2", "Table2"); auto pathId1 = ResolvePathId(runtime, "/Root/Serverless1/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Serverless2/Table2"); diff --git a/ydb/core/statistics/database/ut/ut_database.cpp b/ydb/core/statistics/database/ut/ut_database.cpp index ab2e76f193a5..88ddd9f65e98 100644 --- a/ydb/core/statistics/database/ut/ut_database.cpp +++ b/ydb/core/statistics/database/ut/ut_database.cpp @@ -89,7 +89,7 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database", 1, true); - CreateUniformTable(env, "Database", "Table"); + PrepareUniformTable(env, "Database", "Table"); NYdb::EStatus status; auto test = [&] () { diff --git a/ydb/core/statistics/service/ut/ut_basic_statistics.cpp b/ydb/core/statistics/service/ut/ut_basic_statistics.cpp index a0c34e05a806..a472851d1592 100644 --- a/ydb/core/statistics/service/ut/ut_basic_statistics.cpp +++ b/ydb/core/statistics/service/ut/ut_basic_statistics.cpp @@ -322,7 +322,7 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { TTestEnv env(1, 1); CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); + PrepareUniformTable(env, "Database", "Table"); TestNotFullStatistics(env, /*shardCount=*/ 4, /*expectedRowCount=*/ 4); } diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index 50ec9c339b2f..c6f932ce2127 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -253,6 +253,10 @@ void CreateUniformTable(TTestEnv& env, const TString& databaseName, const TStrin ) WITH ( UNIFORM_PARTITIONS = 4 ); )", databaseName.c_str(), tableName.c_str())); +} + +void PrepareUniformTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { + CreateUniformTable(env, databaseName, tableName); TStringBuilder replace; replace << Sprintf("REPLACE INTO `Root/%s/%s` (Key, Value) VALUES ", @@ -438,14 +442,6 @@ std::shared_ptr ExtractCountMin(TTestActorRuntime& runtime, con return stat.CountMin; } -void ValidateCountMinColumnshard(TTestActorRuntime& runtime, const TPathId& pathId, ui64 expectedProbe) { - auto countMin = ExtractCountMin(runtime, pathId); - - ui32 value = 1; - auto actualProbe = countMin->Probe((const char *)&value, sizeof(value)); - UNIT_ASSERT_VALUES_EQUAL(actualProbe, expectedProbe); -} - void ValidateCountMinDatashard(TTestActorRuntime& runtime, TPathId pathId) { auto countMin = ExtractCountMin(runtime, pathId); diff --git a/ydb/core/statistics/ut_common/ut_common.h b/ydb/core/statistics/ut_common/ut_common.h index e3ac44f88161..f63a6a97dbaa 100644 --- a/ydb/core/statistics/ut_common/ut_common.h +++ b/ydb/core/statistics/ut_common/ut_common.h @@ -98,11 +98,14 @@ NKikimrScheme::TEvDescribeSchemeResult DescribeTable( TVector GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString &path); TVector GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender,const TString &path); +// Create a datashard table with 4 uniform shards. void CreateUniformTable(TTestEnv& env, const TString& databaseName, const TString& tableName); +// Create a datashard table with 4 uniform shards and insert 1 row into each shard. +void PrepareUniformTable(TTestEnv& env, const TString& databaseName, const TString& tableName); + void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableName); std::shared_ptr ExtractCountMin(TTestActorRuntime& runtime, const TPathId& pathId, ui64 columnTag = 1); -void ValidateCountMinColumnshard(TTestActorRuntime& runtime, const TPathId& pathId, ui64 expectedProbe); void ValidateCountMinDatashard(TTestActorRuntime& runtime, TPathId pathId); void ValidateCountMinAbsence(TTestActorRuntime& runtime, TPathId pathId); From 00c0b073acfc8602d98cb111337d50b7b4fe4385 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 5 Dec 2025 15:12:33 +0300 Subject: [PATCH 11/13] more test helpers refactoring (passes existing tests) --- .../aggregator/ut/ut_analyze_datashard.cpp | 34 +++++-- .../service/ut/ut_column_statistics.cpp | 91 +++++-------------- .../statistics/service/ut/ut_http_request.cpp | 4 +- ydb/core/statistics/ut_common/ut_common.cpp | 80 ++++++++++++---- ydb/core/statistics/ut_common/ut_common.h | 26 +++++- 5 files changed, 136 insertions(+), 99 deletions(-) diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp index a6c6ffac3b18..0d52ca5ee150 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp @@ -10,6 +10,26 @@ namespace NKikimr { namespace NStat { +namespace { + +void PrepareTable(TTestEnv& env, const TString& tableName) { + CreateUniformTable(env, "Database", tableName); + InsertDataIntoTable(env, "Database", tableName, RowsWithFewDistinctValues(1000)); +} + +void ValidateCountMinSketch(TTestActorRuntime& runtime, const TPathId& pathId) { + std::vector expected = { + { + .Tag = 2, // Value column + .Probes{ {"1", 100}, {"2", 100}, {"10", 0} } + } + }; + + CheckCountMinSketch(runtime, pathId, expected); +} + +} // namespace + Y_UNIT_TEST_SUITE(AnalyzeDatashard) { Y_UNIT_TEST(AnalyzeOneTable) { @@ -18,14 +38,14 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database"); - PrepareUniformTable(env, "Database", "Table"); + PrepareTable(env, "Table"); ui64 saTabletId; auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId); Analyze(runtime, saTabletId, {{pathId}}); - ValidateCountMinDatashard(runtime, pathId); + ValidateCountMinSketch(runtime, pathId); } Y_UNIT_TEST(AnalyzeTwoTables) { @@ -34,8 +54,8 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database"); - PrepareUniformTable(env, "Database", "Table1"); - PrepareUniformTable(env, "Database", "Table2"); + PrepareTable(env, "Table1"); + PrepareTable(env, "Table2"); ui64 saTabletId1; auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1", nullptr, &saTabletId1); @@ -43,8 +63,8 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { Analyze(runtime, saTabletId1, {pathId1, pathId2}); - ValidateCountMinDatashard(runtime, pathId1); - ValidateCountMinDatashard(runtime, pathId2); + ValidateCountMinSketch(runtime, pathId1); + ValidateCountMinSketch(runtime, pathId2); } Y_UNIT_TEST(DropTableNavigateError) { @@ -53,7 +73,7 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database"); - PrepareUniformTable(env, "Database", "Table"); + PrepareTable(env, "Table"); ui64 saTabletId = 0; auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId); diff --git a/ydb/core/statistics/service/ut/ut_column_statistics.cpp b/ydb/core/statistics/service/ut/ut_column_statistics.cpp index ffa53204e6d5..69aa5fa742d5 100644 --- a/ydb/core/statistics/service/ut/ut_column_statistics.cpp +++ b/ydb/core/statistics/service/ut/ut_column_statistics.cpp @@ -13,73 +13,32 @@ namespace NKikimr { namespace NStat { -struct TColumnStatisticsProbes { - struct TProbe { - ui64 Value; - ui64 Probe; - }; - - ui16 Tag; - std::vector Probes; -}; - -void CheckColumnStatistics( - TTestActorRuntime& runtime, const TPathId& pathId, const TActorId& sender, const std::vector& expected -) { - auto evGet = std::make_unique(); - evGet->StatType = NStat::EStatType::COUNT_MIN_SKETCH; - - for (auto item : expected) { - NStat::TRequest req; - req.PathId = pathId; - req.ColumnTag = item.Tag; - evGet->StatRequests.push_back(req); - } - - auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(0)); - runtime.Send(statServiceId, sender, evGet.release(), 0, true); - - auto res = runtime.GrabEdgeEventRethrow(sender); - auto msg = res->Get(); - - UNIT_ASSERT(msg->Success); - UNIT_ASSERT( msg->StatResponses.size() == expected.size()); - - for (size_t i = 0; i < msg->StatResponses.size(); ++i) { - const auto& stat = msg->StatResponses[i]; - UNIT_ASSERT(stat.Success); +namespace { - auto countMin = stat.CountMinSketch.CountMin.get(); - UNIT_ASSERT(countMin != nullptr); - - for (const auto& item : expected[i].Probes) { - ui64 value = item.Value; - auto probe = countMin->Probe((const char*)&value, sizeof(ui64)); - UNIT_ASSERT_VALUES_EQUAL(item.Probe, probe); - } - } +TTableInfo PrepareTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { + auto tableInfo = CreateColumnTable(env, databaseName, tableName, 1); + InsertDataIntoTable(env, databaseName, tableName, RowsWithFewDistinctValues(1000)); + return tableInfo; } +} // namespace + Y_UNIT_TEST_SUITE(ColumnStatistics) { Y_UNIT_TEST(CountMinSketchStatistics) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); CreateDatabase(env, "Database"); - PrepareColumnTable(env, "Database", "Table1", 1); - ui64 saTabletId = 0; - auto pathId = ResolvePathId(runtime, "/Root/Database/Table1", nullptr, &saTabletId); + const auto tableInfo = PrepareTable(env, "Database", "Table1"); + Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId}); - Analyze(runtime, saTabletId, {pathId}); - - std::vector expected = { + std::vector expected = { { - .Tag = 1, // Key column - .Probes{ {1, 4}, {2, 4} } + .Tag = 2, // Key column + .Probes{ {"1", 100}, {"2", 100} } } }; - auto sender = runtime.AllocateEdgeActor(); - CheckColumnStatistics(runtime, pathId, sender, expected); + CheckCountMinSketch(runtime, tableInfo.PathId, expected); } Y_UNIT_TEST(CountMinSketchServerlessStatistics) { @@ -90,27 +49,21 @@ Y_UNIT_TEST_SUITE(ColumnStatistics) { CreateServerlessDatabase(env, "Serverless1", "/Root/Shared", 1); CreateServerlessDatabase(env, "Serverless2", "/Root/Shared", 1); - PrepareColumnTable(env, "Serverless1", "Table1", 1); - PrepareColumnTable(env, "Serverless2", "Table2", 1); - - // Same SA tablet for both serverless databases - ui64 saTabletId = 0; - auto pathId1 = ResolvePathId(runtime, "/Root/Serverless1/Table1", nullptr, &saTabletId); - auto pathId2 = ResolvePathId(runtime, "/Root/Serverless2/Table2"); + const auto table1 = PrepareTable(env, "Serverless1", "Table1"); + const auto table2 = PrepareTable(env, "Serverless2", "Table2"); - Analyze(runtime, saTabletId, {pathId1}, "opId1", "/Root/Serverless1"); - Analyze(runtime, saTabletId, {pathId2}, "opId1", "/Root/Serverless2"); + Analyze(runtime, table1.SaTabletId, {table1.PathId}, "opId1", "/Root/Serverless1"); + Analyze(runtime, table2.SaTabletId, {table2.PathId}, "opId1", "/Root/Serverless2"); - auto sender = runtime.AllocateEdgeActor(); - std::vector expected = { + std::vector expected = { { - .Tag = 1, // Key column - .Probes{ {1, 4}, {2, 4} } + .Tag = 2, // Value column + .Probes{ {"1", 100}, {"2", 100} } } }; - CheckColumnStatistics(runtime, pathId1, sender, expected); - CheckColumnStatistics(runtime, pathId2, sender, expected); + CheckCountMinSketch(runtime, table1.PathId, expected); + CheckCountMinSketch(runtime, table2.PathId, expected); } } diff --git a/ydb/core/statistics/service/ut/ut_http_request.cpp b/ydb/core/statistics/service/ut/ut_http_request.cpp index fcc1f18930fc..aaf85c1e8151 100644 --- a/ydb/core/statistics/service/ut/ut_http_request.cpp +++ b/ydb/core/statistics/service/ut/ut_http_request.cpp @@ -17,7 +17,9 @@ TTableInfo PrepareDatabaseAndTable(TTestEnv& env, bool isServerless) { } else { CreateDatabase(env, "Database"); } - return PrepareColumnTable(env, "Database", "Table", 10); + auto info = PrepareColumnTable(env, "Database", "Table", 10); + InsertDataIntoTable(env, "Database", "Table", RowsWithFewDistinctValues(1000)); + return info; } void AnalyzeTest(bool isServerless) { diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index c6f932ce2127..fe680bcc69e2 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -248,7 +248,7 @@ void CreateUniformTable(TTestEnv& env, const TString& databaseName, const TStrin ExecuteYqlScript(env, Sprintf(R"( CREATE TABLE `Root/%s/%s` ( Key Uint64, - Value Uint64, + Value String, PRIMARY KEY (Key) ) WITH ( UNIFORM_PARTITIONS = 4 ); @@ -266,7 +266,7 @@ void PrepareUniformTable(TTestEnv& env, const TString& databaseName, const TStri replace << ", "; } ui64 value = 4000000000000000000ull * (i + 1); - replace << Sprintf("(%" PRIu64 "ul, %" PRIu64 "ul)", value, value); + replace << Sprintf("(%" PRIu64 "ul, \"%" PRIu64 "\")", value, value); } replace << ";"; ExecuteYqlScript(env, replace); @@ -299,7 +299,9 @@ TTableInfo CreateColumnTable(TTestEnv& env, const TString& databaseName, const T return tableInfo; } -void InsertDataIntoTable(TTestEnv& env, const TString& databaseName, const TString& tableName, size_t rowCount) { +void InsertDataIntoTable( + TTestEnv& env, const TString& databaseName, const TString& tableName, + std::vector insertedRows) { auto fullTableName = Sprintf("Root/%s/%s", databaseName.c_str(), tableName.c_str()); auto& runtime = *env.GetServer().GetRuntime(); @@ -319,11 +321,10 @@ void InsertDataIntoTable(TTestEnv& env, const TString& databaseName, const TStri reqValueType->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); auto* reqRows = rows->mutable_value(); - - for (size_t i = 0; i < rowCount; ++i) { + for (const auto& inserted : insertedRows) { auto* row = reqRows->add_items(); - row->add_items()->set_uint64_value(i); - row->add_items()->set_bytes_value(ToString(i)); + row->add_items()->set_uint64_value(inserted.Key); + row->add_items()->set_bytes_value(inserted.Value); } auto future = NRpcService::DoLocalRpc( @@ -335,12 +336,16 @@ void InsertDataIntoTable(TTestEnv& env, const TString& databaseName, const TStri env.GetController()->WaitActualization(TDuration::Seconds(1)); } - TTableInfo PrepareColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount) { auto info = CreateColumnTable(env, databaseName, tableName, shardCount); - InsertDataIntoTable(env, databaseName, tableName, ColumnTableRowsNumber); + + std::vector rows; + for (size_t i = 0; i < ColumnTableRowsNumber; ++i) { + rows.push_back(TInsertedRow{.Key = i, .Value = ToString(i)}); + } + InsertDataIntoTable(env, databaseName, tableName, rows); return info; } @@ -399,7 +404,7 @@ TTableInfo PrepareColumnTableWithIndexes(TTestEnv& env, const TString& databaseN auto future = NRpcService::DoLocalRpc( std::move(request), "", "", runtime.GetActorSystem(0)); auto response = runtime.WaitFuture(std::move(future)); - + UNIT_ASSERT(response.operation().ready()); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } @@ -409,6 +414,14 @@ TTableInfo PrepareColumnTableWithIndexes(TTestEnv& env, const TString& databaseN return info; } +std::vector RowsWithFewDistinctValues(size_t count) { + std::vector rows; + for (size_t i = 0; i < count; ++i) { + rows.push_back(TInsertedRow{.Key = i, .Value = ToString(i % 10)}); + } + return rows; +} + void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { ExecuteYqlScript(env, Sprintf(R"( DROP TABLE `Root/%s/%s`; @@ -442,16 +455,6 @@ std::shared_ptr ExtractCountMin(TTestActorRuntime& runtime, con return stat.CountMin; } -void ValidateCountMinDatashard(TTestActorRuntime& runtime, TPathId pathId) { - auto countMin = ExtractCountMin(runtime, pathId); - - for (ui32 i = 0; i < 4; ++i) { - ui64 value = 4000000000000000000ull * (i + 1); - auto probe = countMin->Probe((const char *)&value, sizeof(ui64)); - UNIT_ASSERT_VALUES_EQUAL(probe, 1); - } -} - void ValidateCountMinAbsence(TTestActorRuntime& runtime, TPathId pathId) { auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(1)); @@ -475,6 +478,43 @@ void ValidateCountMinAbsence(TTestActorRuntime& runtime, TPathId pathId) { UNIT_ASSERT(!rsp.Success); } +void CheckCountMinSketch( + TTestActorRuntime& runtime, const TPathId& pathId, + const std::vector& expected) { + auto evGet = std::make_unique(); + evGet->StatType = NStat::EStatType::COUNT_MIN_SKETCH; + + for (auto item : expected) { + NStat::TRequest req; + req.PathId = pathId; + req.ColumnTag = item.Tag; + evGet->StatRequests.push_back(req); + } + + auto sender = runtime.AllocateEdgeActor(); + auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(0)); + runtime.Send(statServiceId, sender, evGet.release(), 0, true); + + auto res = runtime.GrabEdgeEventRethrow(sender); + auto msg = res->Get(); + + UNIT_ASSERT(msg->Success); + UNIT_ASSERT( msg->StatResponses.size() == expected.size()); + + for (size_t i = 0; i < msg->StatResponses.size(); ++i) { + const auto& stat = msg->StatResponses[i]; + UNIT_ASSERT(stat.Success); + + auto countMin = stat.CountMinSketch.CountMin.get(); + UNIT_ASSERT(countMin != nullptr); + + for (const auto& item : expected[i].Probes) { + auto probe = countMin->Probe(item.Value.data(), item.Value.size()); + UNIT_ASSERT_VALUES_EQUAL(item.Expected, probe); + } + } +} + TAnalyzedTable::TAnalyzedTable(const TPathId& pathId) : PathId(pathId) {} diff --git a/ydb/core/statistics/ut_common/ut_common.h b/ydb/core/statistics/ut_common/ut_common.h index f63a6a97dbaa..e306e006708c 100644 --- a/ydb/core/statistics/ut_common/ut_common.h +++ b/ydb/core/statistics/ut_common/ut_common.h @@ -82,7 +82,14 @@ struct TTableInfo { // Create empty column table with the requested number of shards. TTableInfo CreateColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount); -void InsertDataIntoTable(TTestEnv& env, const TString& databaseName, const TString& tableName, size_t rowCount); +struct TInsertedRow { + ui64 Key; + TString Value; +}; + +void InsertDataIntoTable( + TTestEnv& env, const TString& databaseName, const TString& tableName, + std::vector rows); // Create a column table and insert ColumnTableRowsNumber rows. TTableInfo PrepareColumnTable(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount); @@ -91,6 +98,8 @@ TTableInfo PrepareColumnTable(TTestEnv& env, const TString& databaseName, const // and insert ColumnTableRowsNumber rows with some overlap to trigger compaction. TTableInfo PrepareColumnTableWithIndexes(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount); +std::vector RowsWithFewDistinctValues(size_t count); + TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey = nullptr, ui64* saTabletId = nullptr); NKikimrScheme::TEvDescribeSchemeResult DescribeTable( @@ -107,9 +116,22 @@ void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableN std::shared_ptr ExtractCountMin(TTestActorRuntime& runtime, const TPathId& pathId, ui64 columnTag = 1); -void ValidateCountMinDatashard(TTestActorRuntime& runtime, TPathId pathId); void ValidateCountMinAbsence(TTestActorRuntime& runtime, TPathId pathId); +struct TCountMinSketchProbes { + struct TProbe { + TString Value; + ui64 Expected; + }; + + ui16 Tag; + std::vector Probes; +}; + +void CheckCountMinSketch( + TTestActorRuntime& runtime, const TPathId& pathId, + const std::vector& expected); + struct TAnalyzedTable { TPathId PathId; std::vector ColumnTags; From 443978fabd9c64cd9ed9b6f133865cb31f76dce5 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 8 Dec 2025 19:16:22 +0300 Subject: [PATCH 12/13] review fixes --- .../statistics/aggregator/analyze_actor.cpp | 19 ++++++++++++++----- .../statistics/aggregator/analyze_actor.h | 2 +- .../service/ut/ut_column_statistics.cpp | 2 +- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/ydb/core/statistics/aggregator/analyze_actor.cpp b/ydb/core/statistics/aggregator/analyze_actor.cpp index ff804e857322..db181b422c2d 100644 --- a/ydb/core/statistics/aggregator/analyze_actor.cpp +++ b/ydb/core/statistics/aggregator/analyze_actor.cpp @@ -24,14 +24,16 @@ class TCMSEval : public IColumnStatisticEval { static std::optional MaybeCreate( const NKikimrStat::TSimpleColumnStatistics& simpleStats, const NScheme::TTypeInfo&) { - if (simpleStats.GetCountDistinct() >= 0.8 * simpleStats.GetCount()) { + if (simpleStats.GetCount() == 0 || simpleStats.GetCountDistinct() == 0) { + // Empty table return std::nullopt; } const double n = simpleStats.GetCount(); const double ndv = simpleStats.GetCountDistinct(); - if (ndv == 0) { - return TCMSEval(MIN_WIDTH); + + if (ndv >= 0.8 * n) { + return std::nullopt; } const double c = 10; @@ -208,7 +210,7 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& for (const auto& colTag : RequestedColumnTags) { auto colIt = tag2Column.find(colTag); if (colIt == tag2Column.end()) { - // Column probably already deleted, skip it. + // Column probably already dropped, skip it. continue; } addColumn(colIt->second); @@ -219,6 +221,14 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& } } + if (Columns.empty()) { + // All requested columns were already dropped. Send empty response right away. + auto response = std::make_unique(std::move(Results)); + Send(Parent, response.release()); + PassAway(); + return; + } + Become(&TThis::StateQueryStage1); auto actor = std::make_unique( SelfId(), DatabaseName, stage1Builder.Build(TableName), stage1Builder.ColumnCount()); @@ -263,7 +273,6 @@ void TAnalyzeActor::HandleStage1(TEvPrivate::TEvAnalyzeScanResult::TPtr& ev) { continue; } if (statEval->EstimateSize() >= 4_MB) { - // To avoid: Error: ydb/library/yql/dq/runtime/dq_output_channel.cpp:405: Row data size is too big: 53839241 bytes, exceeds limit of 50331648 bytes continue; } statEval->AddAggregations(col.Name, stage2Builder); diff --git a/ydb/core/statistics/aggregator/analyze_actor.h b/ydb/core/statistics/aggregator/analyze_actor.h index 8d78d66bbb51..022ffdb0cc43 100644 --- a/ydb/core/statistics/aggregator/analyze_actor.h +++ b/ydb/core/statistics/aggregator/analyze_actor.h @@ -55,7 +55,7 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { , Name(std::move(name)) {} TColumnDesc(TColumnDesc&&) noexcept = default; - TColumnDesc& operator=(TColumnDesc&) noexcept = default; + TColumnDesc& operator=(TColumnDesc&&) noexcept = default; NKikimrStat::TSimpleColumnStatistics ExtractSimpleStats( ui64 count, const TVector& aggColumns) const; diff --git a/ydb/core/statistics/service/ut/ut_column_statistics.cpp b/ydb/core/statistics/service/ut/ut_column_statistics.cpp index 69aa5fa742d5..c310ac14440d 100644 --- a/ydb/core/statistics/service/ut/ut_column_statistics.cpp +++ b/ydb/core/statistics/service/ut/ut_column_statistics.cpp @@ -34,7 +34,7 @@ Y_UNIT_TEST_SUITE(ColumnStatistics) { std::vector expected = { { - .Tag = 2, // Key column + .Tag = 2, // Value column .Probes{ {"1", 100}, {"2", 100} } } }; From 069fded7d08ab23a11395828284577744fa21cce Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 8 Dec 2025 19:40:54 +0300 Subject: [PATCH 13/13] tests: single function to check count-min values/absence --- .../aggregator/ut/ut_analyze_datashard.cpp | 12 +++++- .../aggregator/ut/ut_traverse_datashard.cpp | 8 ++++ .../service/ut/ut_column_statistics.cpp | 34 ++++++++------- ydb/core/statistics/ut_common/ut_common.cpp | 43 ++++++------------- ydb/core/statistics/ut_common/ut_common.h | 5 +-- 5 files changed, 50 insertions(+), 52 deletions(-) diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp index 0d52ca5ee150..3e94f404b188 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp @@ -19,9 +19,13 @@ void PrepareTable(TTestEnv& env, const TString& tableName) { void ValidateCountMinSketch(TTestActorRuntime& runtime, const TPathId& pathId) { std::vector expected = { + { + .Tag = 1, // Key column + .Probes = std::nullopt, + }, { .Tag = 2, // Value column - .Probes{ {"1", 100}, {"2", 100}, {"10", 0} } + .Probes = { { {"1", 100}, {"2", 100}, {"10", 0} } } } }; @@ -84,7 +88,11 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { runtime, saTabletId, {pathId}, "operationId", {}, NKikimrStat::TEvAnalyzeResponse::STATUS_ERROR); - ValidateCountMinAbsence(runtime, pathId); + std::vector expected = { + { .Tag = 1, .Probes = std::nullopt }, + { .Tag = 2, .Probes = std::nullopt }, + }; + CheckCountMinSketch(runtime, pathId, expected); } } diff --git a/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp b/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp index 102f31c3fee3..d1da8b187713 100644 --- a/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp @@ -19,6 +19,14 @@ TTestEnv CreateTestEnv() { }); } +void ValidateCountMinAbsence(TTestActorRuntime& runtime, TPathId pathId) { + std::vector expected = { + { .Tag = 1, .Probes = std::nullopt }, + { .Tag = 2, .Probes = std::nullopt }, + }; + CheckCountMinSketch(runtime, pathId, expected); +} + } Y_UNIT_TEST_SUITE(TraverseDatashard) { diff --git a/ydb/core/statistics/service/ut/ut_column_statistics.cpp b/ydb/core/statistics/service/ut/ut_column_statistics.cpp index c310ac14440d..0cb9307df17d 100644 --- a/ydb/core/statistics/service/ut/ut_column_statistics.cpp +++ b/ydb/core/statistics/service/ut/ut_column_statistics.cpp @@ -21,6 +21,21 @@ TTableInfo PrepareTable(TTestEnv& env, const TString& databaseName, const TStrin return tableInfo; } +void ValidateCountMinSketch(TTestActorRuntime& runtime, const TPathId& pathId) { + std::vector expected = { + { + .Tag = 1, // Key column + .Probes = std::nullopt, + }, + { + .Tag = 2, // Value column + .Probes = { { {"1", 100}, {"2", 100}, {"10", 0} } } + } + }; + + CheckCountMinSketch(runtime, pathId, expected); +} + } // namespace Y_UNIT_TEST_SUITE(ColumnStatistics) { @@ -32,13 +47,7 @@ Y_UNIT_TEST_SUITE(ColumnStatistics) { const auto tableInfo = PrepareTable(env, "Database", "Table1"); Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId}); - std::vector expected = { - { - .Tag = 2, // Value column - .Probes{ {"1", 100}, {"2", 100} } - } - }; - CheckCountMinSketch(runtime, tableInfo.PathId, expected); + ValidateCountMinSketch(runtime, tableInfo.PathId); } Y_UNIT_TEST(CountMinSketchServerlessStatistics) { @@ -55,15 +64,8 @@ Y_UNIT_TEST_SUITE(ColumnStatistics) { Analyze(runtime, table1.SaTabletId, {table1.PathId}, "opId1", "/Root/Serverless1"); Analyze(runtime, table2.SaTabletId, {table2.PathId}, "opId1", "/Root/Serverless2"); - std::vector expected = { - { - .Tag = 2, // Value column - .Probes{ {"1", 100}, {"2", 100} } - } - }; - - CheckCountMinSketch(runtime, table1.PathId, expected); - CheckCountMinSketch(runtime, table2.PathId, expected); + ValidateCountMinSketch(runtime, table1.PathId); + ValidateCountMinSketch(runtime, table2.PathId); } } diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index fe680bcc69e2..5e9ef1209a62 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -455,29 +455,6 @@ std::shared_ptr ExtractCountMin(TTestActorRuntime& runtime, con return stat.CountMin; } -void ValidateCountMinAbsence(TTestActorRuntime& runtime, TPathId pathId) { - auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(1)); - - NStat::TRequest req; - req.PathId = pathId; - req.ColumnTag = 1; - - auto evGet = std::make_unique(); - evGet->StatType = NStat::EStatType::COUNT_MIN_SKETCH; - evGet->StatRequests.push_back(req); - - auto sender = runtime.AllocateEdgeActor(1); - runtime.Send(statServiceId, sender, evGet.release(), 1, true); - auto evResult = runtime.GrabEdgeEventRethrow(sender); - - UNIT_ASSERT(evResult); - UNIT_ASSERT(evResult->Get()); - UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1); - - auto rsp = evResult->Get()->StatResponses[0]; - UNIT_ASSERT(!rsp.Success); -} - void CheckCountMinSketch( TTestActorRuntime& runtime, const TPathId& pathId, const std::vector& expected) { @@ -498,19 +475,23 @@ void CheckCountMinSketch( auto res = runtime.GrabEdgeEventRethrow(sender); auto msg = res->Get(); - UNIT_ASSERT(msg->Success); - UNIT_ASSERT( msg->StatResponses.size() == expected.size()); + UNIT_ASSERT_VALUES_EQUAL(msg->StatResponses.size(), expected.size()); for (size_t i = 0; i < msg->StatResponses.size(); ++i) { const auto& stat = msg->StatResponses[i]; - UNIT_ASSERT(stat.Success); + const auto& probes = expected[i].Probes; + if (probes) { + UNIT_ASSERT(stat.Success); - auto countMin = stat.CountMinSketch.CountMin.get(); - UNIT_ASSERT(countMin != nullptr); + auto countMin = stat.CountMinSketch.CountMin.get(); + UNIT_ASSERT(countMin != nullptr); - for (const auto& item : expected[i].Probes) { - auto probe = countMin->Probe(item.Value.data(), item.Value.size()); - UNIT_ASSERT_VALUES_EQUAL(item.Expected, probe); + for (const auto& item : *probes) { + auto probe = countMin->Probe(item.Value.data(), item.Value.size()); + UNIT_ASSERT_VALUES_EQUAL(item.Expected, probe); + } + } else { + UNIT_ASSERT(!stat.Success); } } } diff --git a/ydb/core/statistics/ut_common/ut_common.h b/ydb/core/statistics/ut_common/ut_common.h index e306e006708c..27601b961ca7 100644 --- a/ydb/core/statistics/ut_common/ut_common.h +++ b/ydb/core/statistics/ut_common/ut_common.h @@ -116,8 +116,6 @@ void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableN std::shared_ptr ExtractCountMin(TTestActorRuntime& runtime, const TPathId& pathId, ui64 columnTag = 1); -void ValidateCountMinAbsence(TTestActorRuntime& runtime, TPathId pathId); - struct TCountMinSketchProbes { struct TProbe { TString Value; @@ -125,7 +123,8 @@ struct TCountMinSketchProbes { }; ui16 Tag; - std::vector Probes; + // If nullopt, absence of count-min sketch is expected. + std::optional> Probes; }; void CheckCountMinSketch(