Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ set(
../clp/FileDescriptor.hpp
../clp/FileReader.cpp
../clp/FileReader.hpp
../clp/FileWriter.cpp
../clp/FileWriter.hpp
../clp/GrepCore.cpp
../clp/GrepCore.hpp
../clp/SchemaSearcher.cpp
Expand Down Expand Up @@ -98,7 +100,10 @@ set(
../clp/streaming_archive/ArchiveMetadata.hpp
../clp/streaming_archive/Constants.hpp
../clp/streaming_compression/Constants.hpp
../clp/streaming_compression/Compressor.hpp
../clp/streaming_compression/Decompressor.hpp
../clp/streaming_compression/zstd/Compressor.cpp
../clp/streaming_compression/zstd/Compressor.hpp
../clp/streaming_compression/zstd/Decompressor.cpp
../clp/streaming_compression/zstd/Decompressor.hpp
../clp/StringReader.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ auto CommandLineArguments::parse_arguments(int argc, char const** argv)
po::options_description conversion_options("Conversion options");
std::string input_path_list_file_path;
std::string auth{cNoAuth};
bool no_compress_converted_files{false == m_compress_converted_files};
// clang-format off
conversion_options.add_options()(
"inputs-from,f",
Expand Down Expand Up @@ -143,6 +144,10 @@ auto CommandLineArguments::parse_arguments(int argc, char const** argv)
->value_name("LOG_EVENT_SIZE")
->default_value(m_max_log_event_size),
"Maximum allowed size (B) for a single log event before conversion fails."
)(
"no-compress-converted-files",
po::bool_switch(&no_compress_converted_files),
"Disable compression on the converted KV-IR files."
);
// clang-format on

Expand Down Expand Up @@ -198,6 +203,8 @@ auto CommandLineArguments::parse_arguments(int argc, char const** argv)
if (m_max_log_event_size <= 0) {
throw std::invalid_argument("Max event size must be greater than zero.");
}

m_compress_converted_files = false == no_compress_converted_files;
} catch (std::exception& e) {
SPDLOG_ERROR("{}", e.what());
print_basic_usage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class CommandLineArguments {

[[nodiscard]] auto get_max_log_event_size() const -> size_t { return m_max_log_event_size; }

[[nodiscard]] auto get_compress_converted_files() const -> bool {
return m_compress_converted_files;
}

private:
// Methods
void print_basic_usage() const;
Expand All @@ -47,6 +51,7 @@ class CommandLineArguments {
NetworkAuthOption m_network_auth{};
std::string m_output_dir{"./"};
size_t m_max_log_event_size{512ULL * 1024ULL * 1024ULL}; // 512 MiB
bool m_compress_converted_files{true};
};
} // namespace clp_s::log_converter

Expand Down
7 changes: 5 additions & 2 deletions components/core/src/clp_s/log_converter/LogConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,18 @@ auto LogConverter::create(size_t max_buffer_size) -> LogConverter {
auto LogConverter::convert_file(
clp_s::Path const& path,
clp::ReaderInterface* reader,
std::string_view output_dir
std::string_view output_dir,
bool compress_converted_file
) -> ystdlib::error_handling::Result<void> {
m_parser.reset();

// Reset internal buffer state.
m_parser_offset = 0ULL;
m_num_bytes_buffered = 0ULL;

auto serializer{YSTDLIB_ERROR_HANDLING_TRYX(LogSerializer::create(output_dir, path.path))};
auto serializer{YSTDLIB_ERROR_HANDLING_TRYX(
LogSerializer::create(output_dir, path.path, compress_converted_file)
)};

bool reached_end_of_stream{false};
while (false == reached_end_of_stream) {
Expand Down
10 changes: 7 additions & 3 deletions components/core/src/clp_s/log_converter/LogConverter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,19 @@ class LogConverter {
* @param path The input path for the unstructured text file.
* @param reader A reader positioned at the start of the input stream.
* @param output_dir The output directory for generated KV-IR files.
* @param compress_converted_file Whether the converted file should be compressed.
* @return A void result on success, or an error code indicating the failure:
* - std::errc::no_message if `log_surgeon::BufferParser::parse_next_event` returns an error.
* - Forwards `LogSerializer::create()`'s return values.
* - Forwards `refill_buffer()`'s return values.
* - Forwards `LogSerializer::add_message()`'s return values.
*/
[[nodiscard]] auto
convert_file(clp_s::Path const& path, clp::ReaderInterface* reader, std::string_view output_dir)
-> ystdlib::error_handling::Result<void>;
[[nodiscard]] auto convert_file(
clp_s::Path const& path,
clp::ReaderInterface* reader,
std::string_view output_dir,
bool compress_converted_file
) -> ystdlib::error_handling::Result<void>;

private:
// Constants
Expand Down
44 changes: 35 additions & 9 deletions components/core/src/clp_s/log_converter/LogSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
#include <array>
#include <cstdint>
#include <exception>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <system_error>
#include <utility>
#include <vector>

#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
Expand All @@ -16,17 +18,23 @@
#include <nlohmann/json_fwd.hpp>
#include <ystdlib/error_handling/Result.hpp>

#include "../../clp/ffi/ir_stream/Serializer.hpp"
#include "../../clp/ir/types.hpp"
#include "../FileWriter.hpp"
#include <clp/ffi/ir_stream/Serializer.hpp>
#include <clp/FileWriter.hpp>
#include <clp/ir/constants.hpp>
#include <clp/ir/types.hpp>
#include <clp/streaming_compression/zstd/Compressor.hpp>

namespace clp_s::log_converter {
namespace {
constexpr msgpack::object_map cEmptyMap{.size = 0U, .ptr = nullptr};
constexpr std::string_view cUncompressedFileExtension{".clp"};
} // namespace

auto LogSerializer::create(std::string_view output_dir, std::string_view original_file_path)
-> ystdlib::error_handling::Result<LogSerializer> {
auto LogSerializer::create(
std::string_view output_dir,
std::string_view original_file_path,
bool compress_with_zstd
) -> ystdlib::error_handling::Result<LogSerializer> {
nlohmann::json metadata;
metadata.emplace(cOriginalFileMetadataKey, original_file_path);
auto serializer{YSTDLIB_ERROR_HANDLING_TRYX(
Expand All @@ -36,16 +44,34 @@ auto LogSerializer::create(std::string_view output_dir, std::string_view origina
)};

boost::uuids::random_generator uuid_generator;
std::string const file_name{boost::uuids::to_string(uuid_generator()) + ".clp"};
auto file_extension{
compress_with_zstd ? clp::ir::cIrFileExtension : cUncompressedFileExtension
};
std::string const file_name{
boost::uuids::to_string(uuid_generator()) + std::string{file_extension}
};
auto const converted_path{std::filesystem::path{output_dir} / file_name};
clp_s::FileWriter writer;
std::vector<std::unique_ptr<clp::WriterInterface>> nested_writers;
try {
writer.open(converted_path, clp_s::FileWriter::OpenMode::CreateForWriting);
auto writer{std::make_unique<clp::FileWriter>()};
writer->open(converted_path, clp::FileWriter::OpenMode::CREATE_FOR_WRITING);
nested_writers.emplace_back(std::move(writer));
} catch (std::exception const&) {
return std::errc::no_such_file_or_directory;
}

return LogSerializer{std::move(serializer), std::move(writer)};
if (false == compress_with_zstd) {
return LogSerializer{std::move(serializer), std::move(nested_writers)};
}

try {
auto compressor{std::make_unique<clp::streaming_compression::zstd::Compressor>()};
compressor->open(*nested_writers.back().get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | 💤 Low value

Redundant .get() when dereferencing a unique_ptr.

nested_writers.back() already yields a std::unique_ptr<...>&, which can be dereferenced directly. The .get() call is unnecessary.

♻️ Proposed tweak
-        compressor->open(*nested_writers.back().get());
+        compressor->open(*nested_writers.back());
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/core/src/clp_s/log_converter/LogSerializer.cpp` at line 67,
Replace the unnecessary .get() dereference when passing the unique_ptr target to
compressor->open: change the call using nested_writers.back().get() to directly
dereference the unique_ptr (use *nested_writers.back()) so compressor->open
receives the referenced object; look for the expression
compressor->open(*nested_writers.back().get()) in LogSerializer.cpp and update
it to compressor->open(*nested_writers.back()) to remove the redundant .get().

nested_writers.emplace_back(std::move(compressor));
} catch (std::exception const&) {
return std::errc::protocol_error;
}
return LogSerializer{std::move(serializer), std::move(nested_writers)};
}

auto LogSerializer::add_message(std::string_view timestamp, std::string_view message)
Expand Down
52 changes: 37 additions & 15 deletions components/core/src/clp_s/log_converter/LogSerializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
#define CLP_S_LOG_CONVERTER_LOGSERIALIZER_HPP

#include <cstddef>
#include <memory>
#include <string_view>
#include <utility>
#include <vector>

#include <ystdlib/error_handling/Result.hpp>

#include "../../clp/ffi/ir_stream/protocol_constants.hpp"
#include "../../clp/ffi/ir_stream/Serializer.hpp"
#include "../../clp/ir/types.hpp"
#include "../../clp/type_utils.hpp"
#include "../FileWriter.hpp"
#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <clp/ffi/ir_stream/Serializer.hpp>
#include <clp/FileWriter.hpp>
#include <clp/ir/types.hpp>
#include <clp/streaming_compression/Compressor.hpp>
#include <clp/type_utils.hpp>
#include <clp/WriterInterface.hpp>

namespace clp_s::log_converter {
/**
Expand All @@ -24,14 +28,18 @@ class LogSerializer {
* Creates an instance of `LogSerializer`.
* @param output_dir The destination directory for generated KV-IR.
* @param original_file_path The original path for the file being converted to KV-IR.
* @param compress_with_zstd Whether the output KV-IR should be zstd-compressed.
* @return A result containing a `LogSerializer` on success, or an error code indicating the
* failure:
* - std::errc::no_such_file_or_directory if a `clp_s::FileWriter` fails to open an output file.
* - std::errc::no_such_file_or_directory if a `clp::FileWriter` fails to open an output file.
* - std::errc::protocol_error if a `clp::zstd::Compressor` fails to open a compression stream.
* - Forwards `clp::ffi::ir_stream::Serializer<>::create()`'s return values.
*/
[[nodiscard]] static auto
create(std::string_view output_dir, std::string_view original_file_path)
-> ystdlib::error_handling::Result<LogSerializer>;
[[nodiscard]] static auto create(
std::string_view output_dir,
std::string_view original_file_path,
bool compress_with_zstd
) -> ystdlib::error_handling::Result<LogSerializer>;

// Constructors
// Delete copy constructor and assignment operator
Expand Down Expand Up @@ -74,8 +82,18 @@ class LogSerializer {
*/
void close() {
flush_buffer();
m_writer.write_numeric_value(clp::ffi::ir_stream::cProtocol::Eof);
m_writer.close();
m_nested_writers.back()->write_numeric_value(clp::ffi::ir_stream::cProtocol::Eof);
for (auto it{m_nested_writers.rbegin()}; it != m_nested_writers.rend(); ++it) {
if (auto compressor{dynamic_cast<clp::streaming_compression::Compressor*>(it->get())};
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we open an issue about this? If one already exists, can we link it in the PR description?

nullptr != compressor)
{
compressor->close();
} else if (auto file_writer{dynamic_cast<clp::FileWriter*>(it->get())};
nullptr != file_writer)
{
file_writer->close();
}
}
}

private:
Expand All @@ -88,26 +106,30 @@ class LogSerializer {
// Constructors
explicit LogSerializer(
clp::ffi::ir_stream::Serializer<clp::ir::eight_byte_encoded_variable_t>&& serializer,
clp_s::FileWriter&& writer
std::vector<std::unique_ptr<clp::WriterInterface>>&& nested_writers
)
: m_serializer{std::move(serializer)},
m_writer{std::move(writer)} {}
m_nested_writers{std::move(nested_writers)} {}

// Methods
/**
* Flushes the buffer from the serializer to the output file.
*/
void flush_buffer() {
auto const buffer{m_serializer.get_ir_buf_view()};
m_writer.write(
m_nested_writers.back()->write(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused on the order of the component in the pipeline. Looks like the m_nested_writers vector is defined in the order of [FileWriter, Compressor], given that you did emplace_back at create. When zstd is on, .back() resolves to Compressor, which means flush_buffer() now writes to Compressor rather than FileWriter. Does this change the semantic of this function?

Furthermore, could we define the order of the expected writer chain at the m_nested_writers declaration? I feel like this PR assumes some order of the m_nested_writers in coding, I think this would help in case we add more components in the future?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can update the docs to be clear about the expectation for nested writers. The intention is that the writers are in an order like:
output sink writer <- nested writer 1 <- nested writer 2, etc.

The semantic of the flush_buffer function is that it's supposed to write the the buffered output from the IR serializer to some output sink. Both the previous implementation and this implementation don't flush the output sink at this time, so there's no semantic change w.r.t. durability.

clp::size_checked_pointer_cast<char const>(buffer.data()),
buffer.size_bytes()
);
m_serializer.clear_ir_buf();
}

clp::ffi::ir_stream::Serializer<clp::ir::eight_byte_encoded_variable_t> m_serializer;
clp_s::FileWriter m_writer;
// Nested writers are ordered from closest to furthest from output sink. Typically, this will
// look like `FileWriter` <- `Compressor`.
Comment thread
gibber9809 marked this conversation as resolved.
// NOTE: This class depends on there being at least one writer in `m_nested_writers` at all
// times.
std::vector<std::unique_ptr<clp::WriterInterface>> m_nested_writers;
};
} // namespace clp_s::log_converter

Expand Down
3 changes: 2 additions & 1 deletion components/core/src/clp_s/log_converter/log_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ auto convert_files(CommandLineArguments const& command_line_arguments) -> bool {
auto const convert_result{log_converter.convert_file(
path,
nested_readers.back().get(),
command_line_arguments.get_output_dir()
command_line_arguments.get_output_dir(),
command_line_arguments.get_compress_converted_files()
)};
if (convert_result.has_error()) {
auto const& error{convert_result.error()};
Expand Down
Loading