feat(clp-s::log_converter): Add support for compressing converted KV-IR files; make compressing outputs the default behaviour.#2240
Conversation
…mpressed KV-IR outputs the default in log-converter.
|
Caution Review failedFailed to post review comments Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds optional Zstandard compression for converted KV-IR files: build wiring for FileWriter and streaming compression sources, threads a compression flag through LogConverter and CLI, and refactors LogSerializer to use a nested writer chain (FileWriter ± Zstd Compressor). ChangesLog Conversion with Optional Compression
Sequence DiagramsequenceDiagram
participant CLI as CommandLineArguments
participant Converter as LogConverter
participant Serializer as LogSerializer
participant FileWriter as FileWriter
participant Zstd as ZstdCompressor
CLI->>Converter: convert_file(path, reader, out_dir, compress_flag)
Converter->>Serializer: create(out_dir, path, compress_flag)
alt compress_flag == true
Serializer->>FileWriter: create/open file (uncompressed layer)
Serializer->>Zstd: create/open compressor wrapping FileWriter
else compress_flag == false
Serializer->>FileWriter: create/open file
end
Serializer-->>Converter: LogSerializer(with nested writers)
Converter->>Serializer: add_message(record)
Serializer->>FileWriter: write (optionally via Zstd)
Converter->>Serializer: close()
Serializer->>Zstd: close (if present)
Serializer->>FileWriter: close
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@components/core/src/clp_s/CMakeLists.txt`:
- Around line 70-71: The CMake source list erroneously repeats
"../clp/FileReader.hpp" instead of including the header for the newly added
implementation; update the list in CMakeLists.txt so that the entry
corresponding to the added "../clp/FileWriter.cpp" is "../clp/FileWriter.hpp"
(remove the duplicate FileReader.hpp and add FileWriter.hpp) to ensure
FileWriter's header is present in the IDE/source set.
In `@components/core/src/clp_s/log_converter/LogSerializer.cpp`:
- Around line 53-64: The create() path currently only catches exceptions from
clp::FileWriter::open() and maps all failures to
std::errc::no_such_file_or_directory, and it leaves
clp::streaming_compression::zstd::Compressor::open() unguarded; update
LogSerializer::create to wrap both the FileWriter setup and the compressor setup
in their own try/catch blocks so no exceptions propagate (honor the
Result<LogSerializer> contract), and when catching exceptions from
clp::FileWriter use exception details to map to more accurate std::errc values
(or attach the original error) instead of always returning
no_such_file_or_directory; specifically add a try/catch around
compressor->open() (and nested_writers.emplace_back for the compressor) to
convert OperationFailed and other throws into the function's Result error
return, and change the FileWriter catch to inspect the exception type/message
before returning the appropriate std::errc.
In `@components/core/src/clp_s/log_converter/LogSerializer.hpp`:
- Around line 80-93: The close() method performs finalization (flush_buffer(),
m_nested_writers.back()->write_numeric_value(clp::ffi::ir_stream::cProtocol::Eof)
and closing compressors/FileWriter instances) but is opt-in and can be skipped;
change to an abort/commit lifecycle: write outputs to a temporary file during
the LogSerializer lifetime, ensure close() performs the commit/rename on success
and marks the object committed, and ensure the destructor (or an RAII guard
owned by LogSerializer) deletes the temporary file if not committed so
incomplete outputs are removed. Make this change around the existing symbols:
close(), flush_buffer(), m_nested_writers, write_numeric_value(...Eof),
dynamic_cast<clp::streaming_compression::Compressor*> and
dynamic_cast<clp::FileWriter*> so that successful close() renames temp->final
and the destructor/abort path cleans up the temp file.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2c27d2a3-26c8-44cd-b631-141dc5c77976
📒 Files selected for processing (8)
components/core/src/clp_s/CMakeLists.txtcomponents/core/src/clp_s/log_converter/CommandLineArguments.cppcomponents/core/src/clp_s/log_converter/CommandLineArguments.hppcomponents/core/src/clp_s/log_converter/LogConverter.cppcomponents/core/src/clp_s/log_converter/LogConverter.hppcomponents/core/src/clp_s/log_converter/LogSerializer.cppcomponents/core/src/clp_s/log_converter/LogSerializer.hppcomponents/core/src/clp_s/log_converter/log_converter.cpp
| void close() { | ||
| flush_buffer(); | ||
| m_writer.write_numeric_value(clp::ffi::ir_stream::cProtocol::Eof); | ||
| m_writer.close(); | ||
| m_nested_writers.back()->write_numeric_value(clp::ffi::ir_stream::cProtocol::Eof); | ||
| for (auto it{m_nested_writers.rbegin()}; it != m_nested_writers.rend(); ++it) { | ||
| if (auto compressor{dynamic_cast<clp::streaming_compression::Compressor*>(it->get())}; | ||
| nullptr != compressor) | ||
| { | ||
| compressor->close(); | ||
| } else if (auto file_writer{dynamic_cast<clp::FileWriter*>(it->get())}; | ||
| nullptr != file_writer) | ||
| { | ||
| file_writer->close(); | ||
| } | ||
| } |
There was a problem hiding this comment.
close() is carrying correctness-critical work that callers can skip.
This method now writes the EOF marker and finalises the writer chain, but it is still entirely opt-in. Any later TRY* or parse-error return after construction can bypass it, leaving an incomplete compressed output behind in the destination directory. Please move to an abort/commit lifecycle here (for example, temp file + rename on success, delete on failure) or make destruction clean up incomplete outputs automatically.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/core/src/clp_s/log_converter/LogSerializer.hpp` around lines 80 -
93, The close() method performs finalization (flush_buffer(),
m_nested_writers.back()->write_numeric_value(clp::ffi::ir_stream::cProtocol::Eof)
and closing compressors/FileWriter instances) but is opt-in and can be skipped;
change to an abort/commit lifecycle: write outputs to a temporary file during
the LogSerializer lifetime, ensure close() performs the commit/rename on success
and marks the object committed, and ensure the destructor (or an RAII guard
owned by LogSerializer) deletes the temporary file if not committed so
incomplete outputs are removed. Make this change around the existing symbols:
close(), flush_buffer(), m_nested_writers, write_numeric_value(...Eof),
dynamic_cast<clp::streaming_compression::Compressor*> and
dynamic_cast<clp::FileWriter*> so that successful close() renames temp->final
and the destructor/abort path cleans up the temp file.
There was a problem hiding this comment.
♻️ Duplicate comments (2)
components/core/src/clp_s/log_converter/LogSerializer.cpp (1)
53-71:⚠️ Potential issue | 🟠 Major | ⚡ Quick winBoth
catchblocks still discard the original error category.The compressor stage is now wrapped (good), but both handlers swallow
std::exceptionand collapse every failure to a singlestd::errcvalue —no_such_file_or_directoryfor anyFileWriter::openfailure (including permission, EROFS, EIO, name-too-long, etc.) andprotocol_errorfor any Compressor failure (including allocation failure, parameter errors, OOM). This silently misleads callers and complicates user-facing diagnostics. Consider catchingclp::FileWriter::OperationFailed/clp::TraceableException(or inspectingerrno/what()) and propagating a more accuratestd::errc, or returning a richer error type.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/core/src/clp_s/log_converter/LogSerializer.cpp` around lines 53 - 71, The current catch-all handlers around FileWriter::open and zstd::Compressor::open swallow exception details and map every failure to a single std::errc; change them to preserve and map the original error: catch the concrete exception types thrown by clp (e.g., clp::FileWriter::OperationFailed or clp::TraceableException) and extract error info (errno, error code, or what()) to translate to a more specific std::errc (or return a richer error type) before returning; update the FileWriter::open catch to inspect/propagate permission/ENOENT/EROFS/EIO/name-too-long cases and update the Compressor::open catch to distinguish allocation/parameter/OOM errors (or rethrow the original exception) so callers receive accurate failure reasons while still using nested_writers and the LogSerializer constructor.components/core/src/clp_s/log_converter/LogSerializer.hpp (1)
81-95:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift
close()finalization remains opt-in and bypassable.
close()is the only path that writes the EOF marker and runsCompressor::close()/FileWriter::close()(viadynamic_cast). Because the destructor is= default, anyTRY*/error return aftercreate()succeeds (or any unexpected throw fromadd_message/flush_buffercallers) will skip this method, leaving a half-written, EOF-less, possibly-truncated.clp/.clp.zstfile in the destination directory that downstream readers cannot reliably interpret. Please move to an abort/commit lifecycle (e.g., write to a temp filename and rename on successfulclose(), delete on destruction if uncommitted) so partial outputs are never observable.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/core/src/clp_s/log_converter/LogSerializer.hpp` around lines 81 - 95, The close() path currently is optional and can be skipped, leaving incomplete output; modify the lifecycle to an explicit abort/commit model: have create() open a temporary target (e.g., same name + ".tmp") and set an internal "committed" flag false; ensure close() writes the EOF via m_nested_writers.back()->write_numeric_value(...), calls flush_buffer(), invokes dynamic_casted Compressor::close() / FileWriter::close() on entries in m_nested_writers, then atomically rename the temp file to the final filename and set "committed" true; implement a destructor that checks "committed" and if false deletes the temp file (or calls an abort() that closes any open handles without writing EOF and removes temp); ensure add_message/flush_buffer errors leave state consistent (close/abort safe to call) and document the new behavior in create(), close(), and the destructor; reference functions/fields: close(), flush_buffer(), m_nested_writers, Compressor, FileWriter, create(), add_message, and the defaulted destructor so reviewers can locate and change the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@components/core/src/clp_s/log_converter/LogSerializer.cpp`:
- Around line 53-71: The current catch-all handlers around FileWriter::open and
zstd::Compressor::open swallow exception details and map every failure to a
single std::errc; change them to preserve and map the original error: catch the
concrete exception types thrown by clp (e.g., clp::FileWriter::OperationFailed
or clp::TraceableException) and extract error info (errno, error code, or
what()) to translate to a more specific std::errc (or return a richer error
type) before returning; update the FileWriter::open catch to inspect/propagate
permission/ENOENT/EROFS/EIO/name-too-long cases and update the Compressor::open
catch to distinguish allocation/parameter/OOM errors (or rethrow the original
exception) so callers receive accurate failure reasons while still using
nested_writers and the LogSerializer constructor.
In `@components/core/src/clp_s/log_converter/LogSerializer.hpp`:
- Around line 81-95: The close() path currently is optional and can be skipped,
leaving incomplete output; modify the lifecycle to an explicit abort/commit
model: have create() open a temporary target (e.g., same name + ".tmp") and set
an internal "committed" flag false; ensure close() writes the EOF via
m_nested_writers.back()->write_numeric_value(...), calls flush_buffer(), invokes
dynamic_casted Compressor::close() / FileWriter::close() on entries in
m_nested_writers, then atomically rename the temp file to the final filename and
set "committed" true; implement a destructor that checks "committed" and if
false deletes the temp file (or calls an abort() that closes any open handles
without writing EOF and removes temp); ensure add_message/flush_buffer errors
leave state consistent (close/abort safe to call) and document the new behavior
in create(), close(), and the destructor; reference functions/fields: close(),
flush_buffer(), m_nested_writers, Compressor, FileWriter, create(), add_message,
and the defaulted destructor so reviewers can locate and change the code.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9f5947e6-f24f-4ce3-b915-e949ec7d7de1
📒 Files selected for processing (3)
components/core/src/clp_s/CMakeLists.txtcomponents/core/src/clp_s/log_converter/LogSerializer.cppcomponents/core/src/clp_s/log_converter/LogSerializer.hpp
20001020ycx
left a comment
There was a problem hiding this comment.
Overall LGTM, good work, just a minor nit and a design question from myself.
| void flush_buffer() { | ||
| auto const buffer{m_serializer.get_ir_buf_view()}; | ||
| m_writer.write( | ||
| m_nested_writers.back()->write( |
There was a problem hiding this comment.
I am a bit confused on the order of the component in the pipeline. Looks like the m_nested_writers vector is defined in the order of [FileWriter, Compressor], given that you did emplace_back at create. When zstd is on, .back() resolves to Compressor, which means flush_buffer() now writes to Compressor rather than FileWriter. Does this change the semantic of this function?
Furthermore, could we define the order of the expected writer chain at the m_nested_writers declaration? I feel like this PR assumes some order of the m_nested_writers in coding, I think this would help in case we add more components in the future?
There was a problem hiding this comment.
Yeah I can update the docs to be clear about the expectation for nested writers. The intention is that the writers are in an order like:
output sink writer <- nested writer 1 <- nested writer 2, etc.
The semantic of the flush_buffer function is that it's supposed to write the the buffered output from the IR serializer to some output sink. Both the previous implementation and this implementation don't flush the output sink at this time, so there's no semantic change w.r.t. durability.
|
|
||
| boost::uuids::random_generator uuid_generator; | ||
| std::string const file_name{boost::uuids::to_string(uuid_generator()) + ".clp"}; | ||
| auto file_extension{use_zstd ? clp::ir::cIrFileExtension : cDefaultIRFileExtension}; |
There was a problem hiding this comment.
Nit, feel like this import clp:ir:cIrFileExtension doesn't convey the compression we will perform, but probably this is outside the scopeof this PR to update this as I assume there would be many uses of this constant. Maybe name cDefaultIrFileExtension to cUncompressedFileExtension.
Emmmm, I will leave it to you, I am okay with either, just noticed while reading through the code, lol.
There was a problem hiding this comment.
Yeah cUncompressedFileExtension is more readable, will update.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@components/core/src/clp_s/log_converter/LogSerializer.cpp`:
- Line 67: Replace the unnecessary .get() dereference when passing the
unique_ptr target to compressor->open: change the call using
nested_writers.back().get() to directly dereference the unique_ptr (use
*nested_writers.back()) so compressor->open receives the referenced object; look
for the expression compressor->open(*nested_writers.back().get()) in
LogSerializer.cpp and update it to compressor->open(*nested_writers.back()) to
remove the redundant .get().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: dbe6527c-a4c7-4c82-8406-bf0a509596a0
📒 Files selected for processing (3)
components/core/src/clp_s/CMakeLists.txtcomponents/core/src/clp_s/log_converter/LogSerializer.cppcomponents/core/src/clp_s/log_converter/LogSerializer.hpp
|
|
||
| try { | ||
| auto compressor{std::make_unique<clp::streaming_compression::zstd::Compressor>()}; | ||
| compressor->open(*nested_writers.back().get()); |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Redundant .get() when dereferencing a unique_ptr.
nested_writers.back() already yields a std::unique_ptr<...>&, which can be dereferenced directly. The .get() call is unnecessary.
♻️ Proposed tweak
- compressor->open(*nested_writers.back().get());
+ compressor->open(*nested_writers.back());🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/core/src/clp_s/log_converter/LogSerializer.cpp` at line 67,
Replace the unnecessary .get() dereference when passing the unique_ptr target to
compressor->open: change the call using nested_writers.back().get() to directly
dereference the unique_ptr (use *nested_writers.back()) so compressor->open
receives the referenced object; look for the expression
compressor->open(*nested_writers.back().get()) in LogSerializer.cpp and update
it to compressor->open(*nested_writers.back()) to remove the redundant .get().
| * Creates an instance of `LogSerializer`. | ||
| * @param output_dir The destination directory for generated KV-IR. | ||
| * @param original_file_path The original path for the file being converted to KV-IR. | ||
| * @param use_zstd Whether the output KV-IR should be zstd-compressed. |
There was a problem hiding this comment.
How about compress_with_zstd? Feels a bit more specific.
| m_writer.close(); | ||
| m_nested_writers.back()->write_numeric_value(clp::ffi::ir_stream::cProtocol::Eof); | ||
| for (auto it{m_nested_writers.rbegin()}; it != m_nested_writers.rend(); ++it) { | ||
| if (auto compressor{dynamic_cast<clp::streaming_compression::Compressor*>(it->get())}; |
There was a problem hiding this comment.
Can we open an issue about this? If one already exists, can we link it in the PR description?
Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
Description
This PR adds support for compressing converted KV-IR files using zstd. This is now the default behaviour for log-converter, and can be disabled using the
--no-compress-converted-filesflag.Implementation-wise this involves changing
LogSeriailzerto accept a series of nested writers instead of a singleFileWriter. To simplify the implementation we switch to using theclpversion of aFileWriterandzstd::Compressor, since they implementWriterInterface, which makes them easier to work with in a more generic way. Aside:closeshould probably be made part ofclp::WriterInterface, and without that I'm forced to test whether a givenclp::WriterInterfaceimplementscloseviadynamic_castto the many derived classes that individually implementclose. (Added 2291 to track this issue with close).Checklist
breaking change.
Validation performed
hive-24hrdataset is only ~0.5% slower (on an HDD)Summary by CodeRabbit
New Features
Refactor
Chores