diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index 3e00b30c1..bdd0ef4f6 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -69,6 +69,8 @@ set( ../clp/FileDescriptor.hpp ../clp/FileReader.cpp ../clp/FileReader.hpp + ../clp/FileWriter.cpp + ../clp/FileWriter.hpp ../clp/GrepCore.cpp ../clp/GrepCore.hpp ../clp/SchemaSearcher.cpp @@ -98,7 +100,10 @@ set( ../clp/streaming_archive/ArchiveMetadata.hpp ../clp/streaming_archive/Constants.hpp ../clp/streaming_compression/Constants.hpp + ../clp/streaming_compression/Compressor.hpp ../clp/streaming_compression/Decompressor.hpp + ../clp/streaming_compression/zstd/Compressor.cpp + ../clp/streaming_compression/zstd/Compressor.hpp ../clp/streaming_compression/zstd/Decompressor.cpp ../clp/streaming_compression/zstd/Decompressor.hpp ../clp/StringReader.cpp diff --git a/components/core/src/clp_s/log_converter/CommandLineArguments.cpp b/components/core/src/clp_s/log_converter/CommandLineArguments.cpp index e449acc97..1e27c6ed8 100644 --- a/components/core/src/clp_s/log_converter/CommandLineArguments.cpp +++ b/components/core/src/clp_s/log_converter/CommandLineArguments.cpp @@ -116,6 +116,7 @@ auto CommandLineArguments::parse_arguments(int argc, char const** argv) po::options_description conversion_options("Conversion options"); std::string input_path_list_file_path; std::string auth{cNoAuth}; + bool no_compress_converted_files{false == m_compress_converted_files}; // clang-format off conversion_options.add_options()( "inputs-from,f", @@ -143,6 +144,10 @@ auto CommandLineArguments::parse_arguments(int argc, char const** argv) ->value_name("LOG_EVENT_SIZE") ->default_value(m_max_log_event_size), "Maximum allowed size (B) for a single log event before conversion fails." + )( + "no-compress-converted-files", + po::bool_switch(&no_compress_converted_files), + "Disable compression on the converted KV-IR files." ); // clang-format on @@ -198,6 +203,8 @@ auto CommandLineArguments::parse_arguments(int argc, char const** argv) if (m_max_log_event_size <= 0) { throw std::invalid_argument("Max event size must be greater than zero."); } + + m_compress_converted_files = false == no_compress_converted_files; } catch (std::exception& e) { SPDLOG_ERROR("{}", e.what()); print_basic_usage(); diff --git a/components/core/src/clp_s/log_converter/CommandLineArguments.hpp b/components/core/src/clp_s/log_converter/CommandLineArguments.hpp index 8060716a3..90278f247 100644 --- a/components/core/src/clp_s/log_converter/CommandLineArguments.hpp +++ b/components/core/src/clp_s/log_converter/CommandLineArguments.hpp @@ -37,6 +37,10 @@ class CommandLineArguments { [[nodiscard]] auto get_max_log_event_size() const -> size_t { return m_max_log_event_size; } + [[nodiscard]] auto get_compress_converted_files() const -> bool { + return m_compress_converted_files; + } + private: // Methods void print_basic_usage() const; @@ -47,6 +51,7 @@ class CommandLineArguments { NetworkAuthOption m_network_auth{}; std::string m_output_dir{"./"}; size_t m_max_log_event_size{512ULL * 1024ULL * 1024ULL}; // 512 MiB + bool m_compress_converted_files{true}; }; } // namespace clp_s::log_converter diff --git a/components/core/src/clp_s/log_converter/LogConverter.cpp b/components/core/src/clp_s/log_converter/LogConverter.cpp index 52bea1fb6..c2602dcfa 100644 --- a/components/core/src/clp_s/log_converter/LogConverter.cpp +++ b/components/core/src/clp_s/log_converter/LogConverter.cpp @@ -53,7 +53,8 @@ auto LogConverter::create(size_t max_buffer_size) -> LogConverter { auto LogConverter::convert_file( clp_s::Path const& path, clp::ReaderInterface* reader, - std::string_view output_dir + std::string_view output_dir, + bool compress_converted_file ) -> ystdlib::error_handling::Result { m_parser.reset(); @@ -61,7 +62,9 @@ auto LogConverter::convert_file( m_parser_offset = 0ULL; m_num_bytes_buffered = 0ULL; - auto serializer{YSTDLIB_ERROR_HANDLING_TRYX(LogSerializer::create(output_dir, path.path))}; + auto serializer{YSTDLIB_ERROR_HANDLING_TRYX( + LogSerializer::create(output_dir, path.path, compress_converted_file) + )}; bool reached_end_of_stream{false}; while (false == reached_end_of_stream) { diff --git a/components/core/src/clp_s/log_converter/LogConverter.hpp b/components/core/src/clp_s/log_converter/LogConverter.hpp index 509eece4f..c17f44349 100644 --- a/components/core/src/clp_s/log_converter/LogConverter.hpp +++ b/components/core/src/clp_s/log_converter/LogConverter.hpp @@ -30,15 +30,19 @@ class LogConverter { * @param path The input path for the unstructured text file. * @param reader A reader positioned at the start of the input stream. * @param output_dir The output directory for generated KV-IR files. + * @param compress_converted_file Whether the converted file should be compressed. * @return A void result on success, or an error code indicating the failure: * - std::errc::no_message if `log_surgeon::BufferParser::parse_next_event` returns an error. * - Forwards `LogSerializer::create()`'s return values. * - Forwards `refill_buffer()`'s return values. * - Forwards `LogSerializer::add_message()`'s return values. */ - [[nodiscard]] auto - convert_file(clp_s::Path const& path, clp::ReaderInterface* reader, std::string_view output_dir) - -> ystdlib::error_handling::Result; + [[nodiscard]] auto convert_file( + clp_s::Path const& path, + clp::ReaderInterface* reader, + std::string_view output_dir, + bool compress_converted_file + ) -> ystdlib::error_handling::Result; private: // Constants diff --git a/components/core/src/clp_s/log_converter/LogSerializer.cpp b/components/core/src/clp_s/log_converter/LogSerializer.cpp index 66ee49feb..e913fc4fa 100644 --- a/components/core/src/clp_s/log_converter/LogSerializer.cpp +++ b/components/core/src/clp_s/log_converter/LogSerializer.cpp @@ -3,11 +3,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include @@ -16,17 +18,23 @@ #include #include -#include "../../clp/ffi/ir_stream/Serializer.hpp" -#include "../../clp/ir/types.hpp" -#include "../FileWriter.hpp" +#include +#include +#include +#include +#include namespace clp_s::log_converter { namespace { constexpr msgpack::object_map cEmptyMap{.size = 0U, .ptr = nullptr}; +constexpr std::string_view cUncompressedFileExtension{".clp"}; } // namespace -auto LogSerializer::create(std::string_view output_dir, std::string_view original_file_path) - -> ystdlib::error_handling::Result { +auto LogSerializer::create( + std::string_view output_dir, + std::string_view original_file_path, + bool compress_with_zstd +) -> ystdlib::error_handling::Result { nlohmann::json metadata; metadata.emplace(cOriginalFileMetadataKey, original_file_path); auto serializer{YSTDLIB_ERROR_HANDLING_TRYX( @@ -36,16 +44,34 @@ auto LogSerializer::create(std::string_view output_dir, std::string_view origina )}; boost::uuids::random_generator uuid_generator; - std::string const file_name{boost::uuids::to_string(uuid_generator()) + ".clp"}; + auto file_extension{ + compress_with_zstd ? clp::ir::cIrFileExtension : cUncompressedFileExtension + }; + std::string const file_name{ + boost::uuids::to_string(uuid_generator()) + std::string{file_extension} + }; auto const converted_path{std::filesystem::path{output_dir} / file_name}; - clp_s::FileWriter writer; + std::vector> nested_writers; try { - writer.open(converted_path, clp_s::FileWriter::OpenMode::CreateForWriting); + auto writer{std::make_unique()}; + writer->open(converted_path, clp::FileWriter::OpenMode::CREATE_FOR_WRITING); + nested_writers.emplace_back(std::move(writer)); } catch (std::exception const&) { return std::errc::no_such_file_or_directory; } - return LogSerializer{std::move(serializer), std::move(writer)}; + if (false == compress_with_zstd) { + return LogSerializer{std::move(serializer), std::move(nested_writers)}; + } + + try { + auto compressor{std::make_unique()}; + compressor->open(*nested_writers.back().get()); + nested_writers.emplace_back(std::move(compressor)); + } catch (std::exception const&) { + return std::errc::protocol_error; + } + return LogSerializer{std::move(serializer), std::move(nested_writers)}; } auto LogSerializer::add_message(std::string_view timestamp, std::string_view message) diff --git a/components/core/src/clp_s/log_converter/LogSerializer.hpp b/components/core/src/clp_s/log_converter/LogSerializer.hpp index 17212beb3..82fa13fb8 100644 --- a/components/core/src/clp_s/log_converter/LogSerializer.hpp +++ b/components/core/src/clp_s/log_converter/LogSerializer.hpp @@ -2,16 +2,20 @@ #define CLP_S_LOG_CONVERTER_LOGSERIALIZER_HPP #include +#include #include #include +#include #include -#include "../../clp/ffi/ir_stream/protocol_constants.hpp" -#include "../../clp/ffi/ir_stream/Serializer.hpp" -#include "../../clp/ir/types.hpp" -#include "../../clp/type_utils.hpp" -#include "../FileWriter.hpp" +#include +#include +#include +#include +#include +#include +#include namespace clp_s::log_converter { /** @@ -24,14 +28,18 @@ class LogSerializer { * Creates an instance of `LogSerializer`. * @param output_dir The destination directory for generated KV-IR. * @param original_file_path The original path for the file being converted to KV-IR. + * @param compress_with_zstd Whether the output KV-IR should be zstd-compressed. * @return A result containing a `LogSerializer` on success, or an error code indicating the * failure: - * - std::errc::no_such_file_or_directory if a `clp_s::FileWriter` fails to open an output file. + * - std::errc::no_such_file_or_directory if a `clp::FileWriter` fails to open an output file. + * - std::errc::protocol_error if a `clp::zstd::Compressor` fails to open a compression stream. * - Forwards `clp::ffi::ir_stream::Serializer<>::create()`'s return values. */ - [[nodiscard]] static auto - create(std::string_view output_dir, std::string_view original_file_path) - -> ystdlib::error_handling::Result; + [[nodiscard]] static auto create( + std::string_view output_dir, + std::string_view original_file_path, + bool compress_with_zstd + ) -> ystdlib::error_handling::Result; // Constructors // Delete copy constructor and assignment operator @@ -74,8 +82,18 @@ class LogSerializer { */ void close() { flush_buffer(); - m_writer.write_numeric_value(clp::ffi::ir_stream::cProtocol::Eof); - m_writer.close(); + m_nested_writers.back()->write_numeric_value(clp::ffi::ir_stream::cProtocol::Eof); + for (auto it{m_nested_writers.rbegin()}; it != m_nested_writers.rend(); ++it) { + if (auto compressor{dynamic_cast(it->get())}; + nullptr != compressor) + { + compressor->close(); + } else if (auto file_writer{dynamic_cast(it->get())}; + nullptr != file_writer) + { + file_writer->close(); + } + } } private: @@ -88,10 +106,10 @@ class LogSerializer { // Constructors explicit LogSerializer( clp::ffi::ir_stream::Serializer&& serializer, - clp_s::FileWriter&& writer + std::vector>&& nested_writers ) : m_serializer{std::move(serializer)}, - m_writer{std::move(writer)} {} + m_nested_writers{std::move(nested_writers)} {} // Methods /** @@ -99,7 +117,7 @@ class LogSerializer { */ void flush_buffer() { auto const buffer{m_serializer.get_ir_buf_view()}; - m_writer.write( + m_nested_writers.back()->write( clp::size_checked_pointer_cast(buffer.data()), buffer.size_bytes() ); @@ -107,7 +125,11 @@ class LogSerializer { } clp::ffi::ir_stream::Serializer m_serializer; - clp_s::FileWriter m_writer; + // Nested writers are ordered from closest to furthest from output sink. Typically, this will + // look like `FileWriter` <- `Compressor`. + // NOTE: This class depends on there being at least one writer in `m_nested_writers` at all + // times. + std::vector> m_nested_writers; }; } // namespace clp_s::log_converter diff --git a/components/core/src/clp_s/log_converter/log_converter.cpp b/components/core/src/clp_s/log_converter/log_converter.cpp index fe2f26fbc..f8f3af5b9 100644 --- a/components/core/src/clp_s/log_converter/log_converter.cpp +++ b/components/core/src/clp_s/log_converter/log_converter.cpp @@ -69,7 +69,8 @@ auto convert_files(CommandLineArguments const& command_line_arguments) -> bool { auto const convert_result{log_converter.convert_file( path, nested_readers.back().get(), - command_line_arguments.get_output_dir() + command_line_arguments.get_output_dir(), + command_line_arguments.get_compress_converted_files() )}; if (convert_result.has_error()) { auto const& error{convert_result.error()};