From 16b9cbbca450d5b281bcbe82407c08d010347feb Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 18 Dec 2025 13:39:13 +0000 Subject: [PATCH] Merge pull request #92250 from azat/min_columns_to_activate_adaptive_write_buffer Reduce INSERT/merges memory usage with wide parts for very wide tables by enabling adaptive write buffers --- src/Core/SettingsChangesHistory.cpp | 39 ++++++++++++++++++ src/Disks/DiskEncryptedTransaction.cpp | 3 +- src/IO/WriteBufferFromEncryptedFile.cpp | 15 ++++++- src/IO/WriteBufferFromEncryptedFile.h | 7 +++- src/IO/tests/gtest_file_encryption.cpp | 2 +- src/IO/tests/gtest_writebuffer_s3.cpp | 1 - .../MergeTree/MergeTreeDataPartWriterWide.cpp | 4 +- .../MergeTree/MergeTreeIOSettings.cpp | 2 + src/Storages/MergeTree/MergeTreeIOSettings.h | 1 + src/Storages/MergeTree/MergeTreeSettings.cpp | 6 +++ ...o_activate_adaptive_write_buffer.reference | 4 ++ ...s_to_activate_adaptive_write_buffer.sql.j2 | 41 +++++++++++++++++++ 12 files changed, 117 insertions(+), 8 deletions(-) create mode 100644 tests/queries/0_stateless/03770_min_columns_to_activate_adaptive_write_buffer.reference create mode 100644 tests/queries/0_stateless/03770_min_columns_to_activate_adaptive_write_buffer.sql.j2 diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 9dc03ed64207..e5e9110eb7d1 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -858,6 +858,45 @@ const VersionToSettingsChangesMap & getMergeTreeSettingsChangesHistory() static std::once_flag initialized_flag; std::call_once(initialized_flag, [&] { + addSettingsChanges(merge_tree_settings_changes_history, "25.12", + { + {"alter_column_secondary_index_mode", "compatibility", "rebuild", "Change the behaviour to allow ALTER `column` when they have dependent secondary indices"}, + {"merge_selector_enable_heuristic_to_lower_max_parts_to_merge_at_once", false, false, "New setting"}, + {"merge_selector_heuristic_to_lower_max_parts_to_merge_at_once_exponent", 5, 5, "New setting"}, + {"min_columns_to_activate_adaptive_write_buffer", 500, 500, "New setting"}, + {"nullable_serialization_version", "basic", "basic", "New setting"}, + }); + addSettingsChanges(merge_tree_settings_changes_history, "25.11", + { + {"merge_max_dynamic_subcolumns_in_wide_part", "auto", "auto", "Add a new setting to limit number of dynamic subcolumns in Wide part after merge regardless the parameters specified in the data type"}, + {"refresh_statistics_interval", 0, 0, "New setting"}, + {"shared_merge_tree_create_per_replica_metadata_nodes", true, false, "Reduce the amount of metadata in Keeper."}, + {"serialization_info_version", "basic", "with_types", "Change to the newer format allowing custom string serialization"}, + {"string_serialization_version", "single_stream", "with_size_stream", "Change to the newer format with separate sizes"}, + {"escape_variant_subcolumn_filenames", false, true, "Escape special symbols for filenames created for Variant type subcolumns in Wide parts"}, + }); + addSettingsChanges(merge_tree_settings_changes_history, "25.10", + { + {"auto_statistics_types", "", "", "New setting"}, + {"exclude_materialize_skip_indexes_on_merge", "", "", "New setting."}, + {"serialization_info_version", "basic", "basic", "New setting"}, + {"string_serialization_version", "single_stream", "single_stream", "New setting"}, + {"replicated_deduplication_window_seconds", 7 * 24 * 60 * 60, 60*60, "decrease default value"}, + {"shared_merge_tree_activate_coordinated_merges_tasks", false, false, "New settings"}, + {"shared_merge_tree_merge_coordinator_factor", 1.1f, 1.1f, "Lower coordinator sleep time after load"}, + {"min_level_for_wide_part", 0, 0, "New setting"}, + {"min_level_for_full_part_storage", 0, 0, "New setting"}, + }); + addSettingsChanges(merge_tree_settings_changes_history, "25.9", + { + {"vertical_merge_optimize_lightweight_delete", false, true, "New setting"}, + {"replicated_deduplication_window", 1000, 10000, "increase default value"}, + {"shared_merge_tree_enable_automatic_empty_partitions_cleanup", false, false, "New setting"}, + {"shared_merge_tree_empty_partition_lifetime", 86400, 86400, "New setting"}, + {"shared_merge_tree_outdated_parts_group_size", 2, 2, "New setting"}, + {"shared_merge_tree_use_outdated_parts_compact_format", false, true, "Enable outdated parts v3 by default"}, + {"shared_merge_tree_activate_coordinated_merges_tasks", false, false, "New settings"}, + }); addSettingsChanges(merge_tree_settings_changes_history, "25.8", { {"object_serialization_version", "v2", "v2", "Add a setting to control JSON serialization versions"}, diff --git a/src/Disks/DiskEncryptedTransaction.cpp b/src/Disks/DiskEncryptedTransaction.cpp index a528564fd1e4..d9d4f419502c 100644 --- a/src/Disks/DiskEncryptedTransaction.cpp +++ b/src/Disks/DiskEncryptedTransaction.cpp @@ -91,8 +91,7 @@ std::unique_ptr DiskEncryptedTransaction::writeFile( // header.init_vector = FileEncryption::InitVector::random(); } auto buffer = delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings, autocommit); - return std::make_unique(buf_size, std::move(buffer), key, header, old_file_size); - + return std::make_unique(buf_size, std::move(buffer), key, header, old_file_size, settings.use_adaptive_write_buffer, settings.adaptive_write_buffer_initial_size); } } diff --git a/src/IO/WriteBufferFromEncryptedFile.cpp b/src/IO/WriteBufferFromEncryptedFile.cpp index 693f422c549f..a2e7cfc6aa7d 100644 --- a/src/IO/WriteBufferFromEncryptedFile.cpp +++ b/src/IO/WriteBufferFromEncryptedFile.cpp @@ -11,11 +11,15 @@ WriteBufferFromEncryptedFile::WriteBufferFromEncryptedFile( std::unique_ptr out_, const String & key_, const FileEncryption::Header & header_, - size_t old_file_size) - : WriteBufferDecorator(std::move(out_), buffer_size_, nullptr, 0) + size_t old_file_size, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size) + : WriteBufferDecorator(std::move(out_), use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buffer_size_, nullptr, 0) , header(header_) , flush_header(!old_file_size) , encryptor(header.algorithm, key_, header.init_vector) + , use_adaptive_buffer_size(use_adaptive_buffer_size_) + , adaptive_buffer_max_size(buffer_size_) { encryptor.setOffset(old_file_size); } @@ -57,6 +61,13 @@ void WriteBufferFromEncryptedFile::nextImpl() } encryptor.encrypt(working_buffer.begin(), offset(), *out); + + /// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer. + if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_buffer_max_size) + { + memory.resize(std::min(memory.size() * 2, adaptive_buffer_max_size)); + BufferBase::set(memory.data(), memory.size(), 0); + } } } diff --git a/src/IO/WriteBufferFromEncryptedFile.h b/src/IO/WriteBufferFromEncryptedFile.h index 2b59bb468d13..820010b6a145 100644 --- a/src/IO/WriteBufferFromEncryptedFile.h +++ b/src/IO/WriteBufferFromEncryptedFile.h @@ -22,7 +22,9 @@ class WriteBufferFromEncryptedFile : public WriteBufferDecorator out_, const String & key_, const FileEncryption::Header & header_, - size_t old_file_size = 0); + size_t old_file_size, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size); ~WriteBufferFromEncryptedFile() override; @@ -41,6 +43,9 @@ class WriteBufferFromEncryptedFile : public WriteBufferDecorator(tmp_path); - WriteBufferFromEncryptedFile wb(10, std::move(lwb), key, header); + WriteBufferFromEncryptedFile wb(10, std::move(lwb), key, header, /*old_file_size=*/0, /*use_adaptive_buffer_size_=*/ false, /*adaptive_buffer_initial_size=*/ 0); auto data = getRandomASCIIString(20); wb.write(data.data(), data.size()); wb.finalize(); diff --git a/src/IO/tests/gtest_writebuffer_s3.cpp b/src/IO/tests/gtest_writebuffer_s3.cpp index 444c19457dd8..7f596b1a64f3 100644 --- a/src/IO/tests/gtest_writebuffer_s3.cpp +++ b/src/IO/tests/gtest_writebuffer_s3.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 15b555466233..c1e1165e2354 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -188,7 +188,9 @@ void MergeTreeDataPartWriterWide::addStreams( max_compress_block_size = settings.max_compress_block_size; WriteSettings query_write_settings = settings.query_write_settings; - query_write_settings.use_adaptive_write_buffer = settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size()); + query_write_settings.use_adaptive_write_buffer = + (settings.min_columns_to_activate_adaptive_write_buffer && columns_list.size() >= settings.min_columns_to_activate_adaptive_write_buffer) + || (settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size())); query_write_settings.adaptive_write_buffer_initial_size = settings.adaptive_write_buffer_initial_size; column_streams[stream_name] = std::make_unique>( diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.cpp b/src/Storages/MergeTree/MergeTreeIOSettings.cpp index 528f6219227b..480399f09076 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeIOSettings.cpp @@ -37,6 +37,7 @@ namespace MergeTreeSetting { extern const MergeTreeSettingsBool compress_primary_key; extern const MergeTreeSettingsBool use_adaptive_write_buffer_for_dynamic_subcolumns; + extern const MergeTreeSettingsUInt64 min_columns_to_activate_adaptive_write_buffer; extern const MergeTreeSettingsBool use_compact_variant_discriminators_serialization; extern const MergeTreeSettingsNonZeroUInt64 marks_compress_block_size; extern const MergeTreeSettingsString marks_compression_codec; @@ -84,6 +85,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings( , object_shared_data_serialization_version(data_part->isZeroLevel() ? (*storage_settings)[MergeTreeSetting::object_shared_data_serialization_version_for_zero_level_parts] : (*storage_settings)[MergeTreeSetting::object_shared_data_serialization_version]) , object_shared_data_buckets(isCompactPart(data_part) ? (*storage_settings)[MergeTreeSetting::object_shared_data_buckets_for_compact_part] : (*storage_settings)[MergeTreeSetting::object_shared_data_buckets_for_wide_part]) , use_adaptive_write_buffer_for_dynamic_subcolumns((*storage_settings)[MergeTreeSetting::use_adaptive_write_buffer_for_dynamic_subcolumns]) + , min_columns_to_activate_adaptive_write_buffer((*storage_settings)[MergeTreeSetting::min_columns_to_activate_adaptive_write_buffer]) , adaptive_write_buffer_initial_size((*storage_settings)[MergeTreeSetting::adaptive_write_buffer_initial_size]) { } diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.h b/src/Storages/MergeTree/MergeTreeIOSettings.h index 50f6e3998682..acaff7c011bd 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.h +++ b/src/Storages/MergeTree/MergeTreeIOSettings.h @@ -106,6 +106,7 @@ struct MergeTreeWriterSettings MergeTreeObjectSharedDataSerializationVersion object_shared_data_serialization_version; size_t object_shared_data_buckets = 1; bool use_adaptive_write_buffer_for_dynamic_subcolumns; + size_t min_columns_to_activate_adaptive_write_buffer; size_t adaptive_write_buffer_initial_size; }; diff --git a/src/Storages/MergeTree/MergeTreeSettings.cpp b/src/Storages/MergeTree/MergeTreeSettings.cpp index c73ca49f808c..97528cc9c55d 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeSettings.cpp @@ -846,6 +846,12 @@ namespace ErrorCodes DECLARE(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, R"( Initial size of an adaptive write buffer )", 0) \ + DECLARE(UInt64, min_columns_to_activate_adaptive_write_buffer, 500, R"( + Allow to reduce memory usage for tables with lots of columns by using adaptive writer buffers. + Possible values: + - 0 - unlimited + - 1 - always enabled + )", 0) \ DECLARE(UInt64, min_free_disk_bytes_to_perform_insert, 0, R"( The minimum number of bytes that should be free in disk space in order to insert data. If the number of available free bytes is less than diff --git a/tests/queries/0_stateless/03770_min_columns_to_activate_adaptive_write_buffer.reference b/tests/queries/0_stateless/03770_min_columns_to_activate_adaptive_write_buffer.reference new file mode 100644 index 000000000000..4feffd98cea5 --- /dev/null +++ b/tests/queries/0_stateless/03770_min_columns_to_activate_adaptive_write_buffer.reference @@ -0,0 +1,4 @@ +metric_log_0 Compact Horizontal 400000000 +metric_log_0 Wide Horizontal 6000000000 +metric_log_1 Compact Horizontal 400000000 +metric_log_1 Wide Horizontal 400000000 diff --git a/tests/queries/0_stateless/03770_min_columns_to_activate_adaptive_write_buffer.sql.j2 b/tests/queries/0_stateless/03770_min_columns_to_activate_adaptive_write_buffer.sql.j2 new file mode 100644 index 000000000000..8aa306f12dc1 --- /dev/null +++ b/tests/queries/0_stateless/03770_min_columns_to_activate_adaptive_write_buffer.sql.j2 @@ -0,0 +1,41 @@ +-- Tags: no-debug, long +-- - no-debug - debug build uses S3 and this in combination works very slow + +-- Note, all tests avoids vertical merges (enable_vertical_merge_algorithm=0), since they write column by column and do not have excessive memory usage. + +drop table if exists metric_log; +drop table if exists metric_log_adaptive; + +-- Nothing better then production table +system flush logs system.metric_log; + +{% for v in [0, 1] %} +create table metric_log_{{ v }} as system.metric_log engine=MergeTree order by () settings min_columns_to_activate_adaptive_write_buffer={{ v }}, min_bytes_for_wide_part=1e9, enable_vertical_merge_algorithm=0, auto_statistics_types=''; +insert into metric_log_{{ v }} select * from generateRandom() limit 1 settings max_memory_usage = '100Mi' /* usually few tens of MiB */; +optimize table metric_log_{{ v }} final; +truncate table metric_log_{{ v }}; +alter table metric_log_{{ v }} modify setting min_bytes_for_wide_part=0; +insert into metric_log_{{ v }} select * from generateRandom() limit 1 settings max_memory_usage = '{{ 400 if v == 1 else 6000 }}Mi' /* w/o adaptive buffers uses ~3 (for regular) and ~5-6 (for encrypted disks), w/ 100MiB for regular and ~200-400MiB for encrypted disks */; +optimize table metric_log_{{ v }} final; +drop table metric_log_{{ v }}; +{% endfor %} + +-- Flush system tables all at once +system flush logs part_log; +select + table, + part_type, + merge_algorithm, + max2( + peak_memory_usage, + multiIf( + -- see comments for max_memory_usage + part_type = 'Compact', 400e6, + table = 'metric_log_1', 400e6, + table = 'metric_log_0', 6e9, + 0. + ) + ) +from system.part_log +where database = currentDatabase() and event_type = 'MergeParts' +order by event_time_microseconds;