Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,45 @@ const VersionToSettingsChangesMap & getMergeTreeSettingsChangesHistory()
static std::once_flag initialized_flag;
std::call_once(initialized_flag, [&]
{
addSettingsChanges(merge_tree_settings_changes_history, "25.12",
{
{"alter_column_secondary_index_mode", "compatibility", "rebuild", "Change the behaviour to allow ALTER `column` when they have dependent secondary indices"},
{"merge_selector_enable_heuristic_to_lower_max_parts_to_merge_at_once", false, false, "New setting"},
{"merge_selector_heuristic_to_lower_max_parts_to_merge_at_once_exponent", 5, 5, "New setting"},
{"min_columns_to_activate_adaptive_write_buffer", 500, 500, "New setting"},
{"nullable_serialization_version", "basic", "basic", "New setting"},
});
addSettingsChanges(merge_tree_settings_changes_history, "25.11",
{
{"merge_max_dynamic_subcolumns_in_wide_part", "auto", "auto", "Add a new setting to limit number of dynamic subcolumns in Wide part after merge regardless the parameters specified in the data type"},
{"refresh_statistics_interval", 0, 0, "New setting"},
{"shared_merge_tree_create_per_replica_metadata_nodes", true, false, "Reduce the amount of metadata in Keeper."},
{"serialization_info_version", "basic", "with_types", "Change to the newer format allowing custom string serialization"},
{"string_serialization_version", "single_stream", "with_size_stream", "Change to the newer format with separate sizes"},
{"escape_variant_subcolumn_filenames", false, true, "Escape special symbols for filenames created for Variant type subcolumns in Wide parts"},
});
addSettingsChanges(merge_tree_settings_changes_history, "25.10",
{
{"auto_statistics_types", "", "", "New setting"},
{"exclude_materialize_skip_indexes_on_merge", "", "", "New setting."},
{"serialization_info_version", "basic", "basic", "New setting"},
{"string_serialization_version", "single_stream", "single_stream", "New setting"},
{"replicated_deduplication_window_seconds", 7 * 24 * 60 * 60, 60*60, "decrease default value"},
{"shared_merge_tree_activate_coordinated_merges_tasks", false, false, "New settings"},
{"shared_merge_tree_merge_coordinator_factor", 1.1f, 1.1f, "Lower coordinator sleep time after load"},
{"min_level_for_wide_part", 0, 0, "New setting"},
{"min_level_for_full_part_storage", 0, 0, "New setting"},
});
addSettingsChanges(merge_tree_settings_changes_history, "25.9",
{
{"vertical_merge_optimize_lightweight_delete", false, true, "New setting"},
{"replicated_deduplication_window", 1000, 10000, "increase default value"},
{"shared_merge_tree_enable_automatic_empty_partitions_cleanup", false, false, "New setting"},
{"shared_merge_tree_empty_partition_lifetime", 86400, 86400, "New setting"},
{"shared_merge_tree_outdated_parts_group_size", 2, 2, "New setting"},
{"shared_merge_tree_use_outdated_parts_compact_format", false, true, "Enable outdated parts v3 by default"},
{"shared_merge_tree_activate_coordinated_merges_tasks", false, false, "New settings"},
});
addSettingsChanges(merge_tree_settings_changes_history, "25.8",
{
{"object_serialization_version", "v2", "v2", "Add a setting to control JSON serialization versions"},
Expand Down
3 changes: 1 addition & 2 deletions src/Disks/DiskEncryptedTransaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskEncryptedTransaction::writeFile( //
header.init_vector = FileEncryption::InitVector::random();
}
auto buffer = delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings, autocommit);
return std::make_unique<WriteBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header, old_file_size);

return std::make_unique<WriteBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header, old_file_size, settings.use_adaptive_write_buffer, settings.adaptive_write_buffer_initial_size);
}

}
Expand Down
15 changes: 13 additions & 2 deletions src/IO/WriteBufferFromEncryptedFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ WriteBufferFromEncryptedFile::WriteBufferFromEncryptedFile(
std::unique_ptr<WriteBufferFromFileBase> out_,
const String & key_,
const FileEncryption::Header & header_,
size_t old_file_size)
: WriteBufferDecorator<WriteBufferFromFileBase>(std::move(out_), buffer_size_, nullptr, 0)
size_t old_file_size,
bool use_adaptive_buffer_size_,
size_t adaptive_buffer_initial_size)
: WriteBufferDecorator<WriteBufferFromFileBase>(std::move(out_), use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buffer_size_, nullptr, 0)
, header(header_)
, flush_header(!old_file_size)
, encryptor(header.algorithm, key_, header.init_vector)
, use_adaptive_buffer_size(use_adaptive_buffer_size_)
, adaptive_buffer_max_size(buffer_size_)
{
encryptor.setOffset(old_file_size);
}
Expand Down Expand Up @@ -57,6 +61,13 @@ void WriteBufferFromEncryptedFile::nextImpl()
}

encryptor.encrypt(working_buffer.begin(), offset(), *out);

/// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer.
if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_buffer_max_size)
{
memory.resize(std::min(memory.size() * 2, adaptive_buffer_max_size));
BufferBase::set(memory.data(), memory.size(), 0);
}
}

}
Expand Down
7 changes: 6 additions & 1 deletion src/IO/WriteBufferFromEncryptedFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class WriteBufferFromEncryptedFile : public WriteBufferDecorator<WriteBufferFrom
std::unique_ptr<WriteBufferFromFileBase> out_,
const String & key_,
const FileEncryption::Header & header_,
size_t old_file_size = 0);
size_t old_file_size,
bool use_adaptive_buffer_size_,
size_t adaptive_buffer_initial_size);

~WriteBufferFromEncryptedFile() override;

Expand All @@ -41,6 +43,9 @@ class WriteBufferFromEncryptedFile : public WriteBufferDecorator<WriteBufferFrom
FileEncryption::Encryptor encryptor;

LoggerPtr log = getLogger("WriteBufferFromEncryptedFile");

bool use_adaptive_buffer_size;
size_t adaptive_buffer_max_size;
};

}
Expand Down
2 changes: 1 addition & 1 deletion src/IO/tests/gtest_file_encryption.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ TEST(FileEncryptionPositionUpdateTest, Decryption)
header.init_vector = InitVector::random();

auto lwb = std::make_unique<WriteBufferFromFile>(tmp_path);
WriteBufferFromEncryptedFile wb(10, std::move(lwb), key, header);
WriteBufferFromEncryptedFile wb(10, std::move(lwb), key, header, /*old_file_size=*/0, /*use_adaptive_buffer_size_=*/ false, /*adaptive_buffer_initial_size=*/ 0);
auto data = getRandomASCIIString(20);
wb.write(data.data(), data.size());
wb.finalize();
Expand Down
1 change: 0 additions & 1 deletion src/IO/tests/gtest_writebuffer_s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <IO/WriteBufferFromS3.h>
#include <IO/S3Common.h>
#include <IO/FileEncryptionCommon.h>
#include <IO/WriteBufferFromEncryptedFile.h>
#include <IO/ReadBufferFromEncryptedFile.h>
#include <IO/AsyncReadCounters.h>
#include <IO/ReadBufferFromS3.h>
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ void MergeTreeDataPartWriterWide::addStreams(
max_compress_block_size = settings.max_compress_block_size;

WriteSettings query_write_settings = settings.query_write_settings;
query_write_settings.use_adaptive_write_buffer = settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size());
query_write_settings.use_adaptive_write_buffer =
(settings.min_columns_to_activate_adaptive_write_buffer && columns_list.size() >= settings.min_columns_to_activate_adaptive_write_buffer)
|| (settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size()));
query_write_settings.adaptive_write_buffer_initial_size = settings.adaptive_write_buffer_initial_size;

column_streams[stream_name] = std::make_unique<Stream<false>>(
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeIOSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace MergeTreeSetting
{
extern const MergeTreeSettingsBool compress_primary_key;
extern const MergeTreeSettingsBool use_adaptive_write_buffer_for_dynamic_subcolumns;
extern const MergeTreeSettingsUInt64 min_columns_to_activate_adaptive_write_buffer;
extern const MergeTreeSettingsBool use_compact_variant_discriminators_serialization;
extern const MergeTreeSettingsNonZeroUInt64 marks_compress_block_size;
extern const MergeTreeSettingsString marks_compression_codec;
Expand Down Expand Up @@ -84,6 +85,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
, object_shared_data_serialization_version(data_part->isZeroLevel() ? (*storage_settings)[MergeTreeSetting::object_shared_data_serialization_version_for_zero_level_parts] : (*storage_settings)[MergeTreeSetting::object_shared_data_serialization_version])
, object_shared_data_buckets(isCompactPart(data_part) ? (*storage_settings)[MergeTreeSetting::object_shared_data_buckets_for_compact_part] : (*storage_settings)[MergeTreeSetting::object_shared_data_buckets_for_wide_part])
, use_adaptive_write_buffer_for_dynamic_subcolumns((*storage_settings)[MergeTreeSetting::use_adaptive_write_buffer_for_dynamic_subcolumns])
, min_columns_to_activate_adaptive_write_buffer((*storage_settings)[MergeTreeSetting::min_columns_to_activate_adaptive_write_buffer])
, adaptive_write_buffer_initial_size((*storage_settings)[MergeTreeSetting::adaptive_write_buffer_initial_size])
{
}
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeIOSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ struct MergeTreeWriterSettings
MergeTreeObjectSharedDataSerializationVersion object_shared_data_serialization_version;
size_t object_shared_data_buckets = 1;
bool use_adaptive_write_buffer_for_dynamic_subcolumns;
size_t min_columns_to_activate_adaptive_write_buffer;
size_t adaptive_write_buffer_initial_size;
};

Expand Down
6 changes: 6 additions & 0 deletions src/Storages/MergeTree/MergeTreeSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,12 @@ namespace ErrorCodes
DECLARE(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, R"(
Initial size of an adaptive write buffer
)", 0) \
DECLARE(UInt64, min_columns_to_activate_adaptive_write_buffer, 500, R"(
Allow to reduce memory usage for tables with lots of columns by using adaptive writer buffers.
Possible values:
- 0 - unlimited
- 1 - always enabled
)", 0) \
DECLARE(UInt64, min_free_disk_bytes_to_perform_insert, 0, R"(
The minimum number of bytes that should be free in disk space in order to
insert data. If the number of available free bytes is less than
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
metric_log_0 Compact Horizontal 400000000
metric_log_0 Wide Horizontal 6000000000
metric_log_1 Compact Horizontal 400000000
metric_log_1 Wide Horizontal 400000000
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- Tags: no-debug, long
-- - no-debug - debug build uses S3 and this in combination works very slow

-- Note, all tests avoids vertical merges (enable_vertical_merge_algorithm=0), since they write column by column and do not have excessive memory usage.

drop table if exists metric_log;
drop table if exists metric_log_adaptive;

-- Nothing better then production table
system flush logs system.metric_log;

{% for v in [0, 1] %}
create table metric_log_{{ v }} as system.metric_log engine=MergeTree order by () settings min_columns_to_activate_adaptive_write_buffer={{ v }}, min_bytes_for_wide_part=1e9, enable_vertical_merge_algorithm=0, auto_statistics_types='';
insert into metric_log_{{ v }} select * from generateRandom() limit 1 settings max_memory_usage = '100Mi' /* usually few tens of MiB */;
optimize table metric_log_{{ v }} final;
truncate table metric_log_{{ v }};
alter table metric_log_{{ v }} modify setting min_bytes_for_wide_part=0;
insert into metric_log_{{ v }} select * from generateRandom() limit 1 settings max_memory_usage = '{{ 400 if v == 1 else 6000 }}Mi' /* w/o adaptive buffers uses ~3 (for regular) and ~5-6 (for encrypted disks), w/ 100MiB for regular and ~200-400MiB for encrypted disks */;
optimize table metric_log_{{ v }} final;
drop table metric_log_{{ v }};
{% endfor %}

-- Flush system tables all at once
system flush logs part_log;
select
table,
part_type,
merge_algorithm,
max2(
peak_memory_usage,
multiIf(
-- see comments for max_memory_usage
part_type = 'Compact', 400e6,
table = 'metric_log_1', 400e6,
table = 'metric_log_0', 6e9,
0.
)
)
from system.part_log
where database = currentDatabase() and event_type = 'MergeParts'
order by event_time_microseconds;
Loading