diff --git a/CHANGELOG.md b/CHANGELOG.md index acb9212..a609419 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## 0.41.0 - 2025-08-12 + +### Enhancements +- Added static `Builder()` methods to the clients +- Improved debug logging in live clients +- Added `PUBLISHER_SPECIFIC` flag + +### Breaking changes +- Removed unused `Received` variant from `JobState` enum + ## 0.40.0 - 2025-07-29 ### Enhancements diff --git a/CMakeLists.txt b/CMakeLists.txt index 6741c4c..7267dbf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.0) project( databento - VERSION 0.40.0 + VERSION 0.41.0 LANGUAGES CXX DESCRIPTION "Official Databento client library" ) diff --git a/README.md b/README.md index fe24e01..e4bd536 100644 --- a/README.md +++ b/README.md @@ -97,27 +97,26 @@ Here is a simple program that fetches 10 seconds of trades for all ES mini futur #include #include -using namespace databento; +namespace db = databento; int main() { - PitSymbolMap symbol_mappings; + db::PitSymbolMap symbol_mappings; - auto client = LiveBuilder{} + auto client = db::LiveThreaded::Builder() .SetKeyFromEnv() - .SetDataset(Dataset::GlbxMdp3) + .SetDataset(db::Dataset::GlbxMdp3) .BuildThreaded(); - auto handler = [&symbol_mappings](const Record& rec) { + auto handler = [&symbol_mappings](const db::Record& rec) { symbol_mappings.OnRecord(rec); - if (const auto* trade = rec.GetIf()) { - std::cout << "Received trade for " - << symbol_mappings[trade->hd.instrument_id] << ':' << *trade - << '\n'; + if (const auto* trade = rec.GetIf()) { + std::cout << "Received trade for " << symbol_mappings[trade->hd.instrument_id] + << ':' << *trade << '\n'; } - return KeepGoing::Continue; + return db::KeepGoing::Continue; }; - client.Subscribe({"ES.FUT"}, Schema::Trades, SType::Parent); + client.Subscribe({"ES.FUT"}, db::Schema::Trades, db::SType::Parent); client.Start(handler); std::this_thread::sleep_for(std::chrono::seconds{10}); return 0; @@ -135,24 +134,23 @@ Here is a simple program that fetches 10 minutes worth of historical trades for #include #include -using namespace databento; +namespace db = databento; int main() { - auto client = HistoricalBuilder{}.SetKey("$YOUR_API_KEY").Build(); - TsSymbolMap symbol_map; - auto decode_symbols = [&symbol_map](const Metadata& metadata) { + auto client = db::Historical::Builder().SetKey("$YOUR_API_KEY").Build(); + db::TsSymbolMap symbol_map; + auto decode_symbols = [&symbol_map](const db::Metadata& metadata) { symbol_map = metadata.CreateSymbolMap(); }; - auto print_trades = [&symbol_map](const Record& record) { - const auto& trade_msg = record.Get(); - std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " - << trade_msg << '\n'; - return KeepGoing::Continue; + auto print_trades = [&symbol_map](const db::Record& record) { + const auto& trade_msg = record.Get(); + std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " << trade_msg + << '\n'; + return db::KeepGoing::Continue; }; - client.TimeseriesGetRange( - "GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"}, kAllSymbols, - Schema::Trades, SType::RawSymbol, SType::InstrumentId, {}, decode_symbols, - print_trades); + client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"}, + {"ESM2", "NQZ2"}, db::Schema::Trades, db::SType::RawSymbol, + db::SType::InstrumentId, {}, decode_symbols, print_trades); } ``` diff --git a/examples/historical/batch.cpp b/examples/historical/batch.cpp index 57444aa..e669156 100644 --- a/examples/historical/batch.cpp +++ b/examples/historical/batch.cpp @@ -3,20 +3,22 @@ #include // find_if #include -#include "databento/constants.hpp" #include "databento/historical.hpp" +#include "databento/publishers.hpp" // Dataset + +namespace db = databento; int main() { - auto client = databento::HistoricalBuilder{}.SetKeyFromEnv().Build(); + auto client = db::Historical::Builder().SetKeyFromEnv().Build(); const auto job = - client.BatchSubmitJob(databento::dataset::kGlbxMdp3, {"GEZ2"}, - databento::Schema::Trades, {"2022-08-26", "2022-09-27"}); + client.BatchSubmitJob(db::ToString(db::Dataset::GlbxMdp3), {"GEZ2"}, + db::Schema::Trades, {"2022-08-26", "2022-09-27"}); const auto all_jobs = client.BatchListJobs(); - const auto all_job_it = std::find_if( - all_jobs.begin(), all_jobs.end(), - [&job](const databento::BatchJob& a_job) { return job.id == a_job.id; }); + const auto all_job_it = + std::find_if(all_jobs.begin(), all_jobs.end(), + [&job](const db::BatchJob& a_job) { return job.id == a_job.id; }); if (all_job_it == all_jobs.end()) { std::cout << "Couldn't find submitted job\n"; diff --git a/examples/historical/metadata.cpp b/examples/historical/metadata.cpp index f52985e..c11e8c8 100644 --- a/examples/historical/metadata.cpp +++ b/examples/historical/metadata.cpp @@ -1,14 +1,16 @@ #include #include -#include "databento/constants.hpp" #include "databento/enums.hpp" #include "databento/historical.hpp" +#include "databento/publishers.hpp" + +namespace db = databento; int main() { - using databento::dataset::kGlbxMdp3; + const char* glbx_dataset = db::ToString(db::Dataset::GlbxMdp3); - auto client = databento::HistoricalBuilder{}.SetKeyFromEnv().Build(); + auto client = db::Historical::Builder().SetKeyFromEnv().Build(); const auto publishers = client.MetadataListPublishers(); std::cout << "Publishers:\n"; @@ -24,15 +26,14 @@ int main() { } std::cout << '\n'; - const auto schemas = client.MetadataListSchemas(kGlbxMdp3); + const auto schemas = client.MetadataListSchemas(glbx_dataset); std::cout << "Schemas(GLBX):\n"; for (const auto& schema : schemas) { std::cout << "- " << schema << '\n'; } std::cout << '\n'; - const auto fields = - client.MetadataListFields(databento::Encoding::Dbn, databento::Schema::Trades); + const auto fields = client.MetadataListFields(db::Encoding::Dbn, db::Schema::Trades); std::cout << "Fields:\n"; for (const auto& field_detail : fields) { std::cout << "- " << field_detail << '\n'; @@ -47,7 +48,7 @@ int main() { } std::cout << '\n'; - const auto all_unit_prices = client.MetadataListUnitPrices(kGlbxMdp3); + const auto all_unit_prices = client.MetadataListUnitPrices(glbx_dataset); std::cout << "Unit prices:\n"; for (const auto& [mode, unit_prices] : all_unit_prices) { const auto* mode_str = ToString(mode); @@ -58,16 +59,16 @@ int main() { std::cout << '\n'; const auto record_count = client.MetadataGetRecordCount( - kGlbxMdp3, {"2020-12-28", "2020-12-29"}, {"ESH1"}, databento::Schema::Mbo); + glbx_dataset, {"2020-12-28", "2020-12-29"}, {"ESH1"}, db::Schema::Mbo); std::cout << "Record count: " << record_count << "\n\n"; const std::size_t billable_size = client.MetadataGetBillableSize( - kGlbxMdp3, {"2020-12-28", "2020-12-29"}, {"ESH1"}, databento::Schema::Mbo, - databento::SType::RawSymbol, {}); + glbx_dataset, {"2020-12-28", "2020-12-29"}, {"ESH1"}, db::Schema::Mbo, + db::SType::RawSymbol, {}); std::cout << "Billable size (uncompressed binary bytes): " << billable_size << "\n\n"; - const auto cost = client.MetadataGetCost(kGlbxMdp3, {"2020-12-28", "2020-12-29"}, - {"ESH1"}, databento::Schema::Mbo); + const auto cost = client.MetadataGetCost(glbx_dataset, {"2020-12-28", "2020-12-29"}, + {"ESH1"}, db::Schema::Mbo); std::cout << "Cost (in cents): " << cost << '\n'; return 0; diff --git a/examples/historical/readme.cpp b/examples/historical/readme.cpp index b3d4443..cdad594 100644 --- a/examples/historical/readme.cpp +++ b/examples/historical/readme.cpp @@ -1,27 +1,25 @@ // Duplicate of the example usage code from the README.md to ensure // it compiles and to be able to clang-format it. -// NOLINTBEGIN(google-build-using-namespace) #include #include #include #include -using namespace databento; +namespace db = databento; int main() { - auto client = HistoricalBuilder{}.SetKey("$YOUR_API_KEY").Build(); - TsSymbolMap symbol_map; - auto decode_symbols = [&symbol_map](const Metadata& metadata) { + auto client = db::Historical::Builder().SetKey("$YOUR_API_KEY").Build(); + db::TsSymbolMap symbol_map; + auto decode_symbols = [&symbol_map](const db::Metadata& metadata) { symbol_map = metadata.CreateSymbolMap(); }; - auto print_trades = [&symbol_map](const Record& record) { - const auto& trade_msg = record.Get(); + auto print_trades = [&symbol_map](const db::Record& record) { + const auto& trade_msg = record.Get(); std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " << trade_msg << '\n'; - return KeepGoing::Continue; + return db::KeepGoing::Continue; }; client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"}, - {"ESM2", "NQZ2"}, Schema::Trades, SType::RawSymbol, - SType::InstrumentId, {}, decode_symbols, print_trades); + {"ESM2", "NQZ2"}, db::Schema::Trades, db::SType::RawSymbol, + db::SType::InstrumentId, {}, decode_symbols, print_trades); } -// NOLINTEND(google-build-using-namespace) diff --git a/examples/historical/symbology_resolve.cpp b/examples/historical/symbology_resolve.cpp index 3b6c259..6493a17 100644 --- a/examples/historical/symbology_resolve.cpp +++ b/examples/historical/symbology_resolve.cpp @@ -6,24 +6,25 @@ #include "databento/historical.hpp" #include "databento/symbology.hpp" +namespace db = databento; + int main(int argc, char* argv[]) { if (argc < 6) { std::cerr << "USAGE: symbology-resolve " " \n"; return 1; } - const auto stype_in = databento::FromString(argv[2]); - const auto stype_out = databento::FromString(argv[3]); + const auto stype_in = db::FromString(argv[2]); + const auto stype_out = db::FromString(argv[3]); std::vector symbols; for (int i = 5; i < argc; ++i) { symbols.emplace_back(argv[i]); } - auto client = databento::HistoricalBuilder{}.SetKeyFromEnv().Build(); - const databento::SymbologyResolution resolution = - client.SymbologyResolve(argv[1], symbols, stype_in, stype_out, - databento::DateTimeRange{argv[4]}); + auto client = db::Historical::Builder().SetKeyFromEnv().Build(); + const db::SymbologyResolution resolution = client.SymbologyResolve( + argv[1], symbols, stype_in, stype_out, db::DateTimeRange{argv[4]}); std::cout << resolution << '\n'; return 0; diff --git a/examples/historical/timeseries_get_range.cpp b/examples/historical/timeseries_get_range.cpp index d94aa86..1ee9fe9 100644 --- a/examples/historical/timeseries_get_range.cpp +++ b/examples/historical/timeseries_get_range.cpp @@ -4,19 +4,19 @@ #include "databento/enums.hpp" #include "databento/historical.hpp" +namespace db = databento; + int main() { - auto client = databento::HistoricalBuilder{}.SetKeyFromEnv().Build(); + auto client = db::Historical::Builder().SetKeyFromEnv().Build(); const auto limit = 1000; client.TimeseriesGetRange( - databento::dataset::kGlbxMdp3, - databento::DateTimeRange{"2022-10-03"}, {"ESZ2"}, - databento::Schema::Trades, databento::SType::RawSymbol, - databento::SType::InstrumentId, limit, - [](databento::Metadata&& metadata) { std::cout << metadata << '\n'; }, - [](const databento::Record& record) { - const auto& trade_msg = record.Get(); + db::dataset::kGlbxMdp3, db::DateTimeRange{"2022-10-03"}, {"ESZ2"}, + db::Schema::Trades, db::SType::RawSymbol, db::SType::InstrumentId, limit, + [](db::Metadata&& metadata) { std::cout << metadata << '\n'; }, + [](const db::Record& record) { + const auto& trade_msg = record.Get(); std::cout << trade_msg << '\n'; - return databento::KeepGoing::Continue; + return db::KeepGoing::Continue; }); return 0; diff --git a/examples/historical/timeseries_get_range_to_file.cpp b/examples/historical/timeseries_get_range_to_file.cpp index 2700c75..5165b0d 100644 --- a/examples/historical/timeseries_get_range_to_file.cpp +++ b/examples/historical/timeseries_get_range_to_file.cpp @@ -1,8 +1,3 @@ -#include -#include -#include -#include // setw - #include "databento/constants.hpp" #include "databento/datetime.hpp" #include "databento/dbn_file_store.hpp" @@ -10,17 +5,19 @@ #include "databento/historical.hpp" #include "databento/record.hpp" +namespace db = databento; + int main() { - auto client = databento::HistoricalBuilder{}.SetKeyFromEnv().Build(); + auto client = db::Historical::Builder().SetKeyFromEnv().Build(); const auto limit = 1000; - databento::DbnFileStore dbn_file_store = client.TimeseriesGetRangeToFile( - databento::dataset::kGlbxMdp3, {"2022-10-03T00:00", "2022-10-04T00:00"}, {"ESZ2"}, - databento::Schema::Ohlcv1M, databento::SType::RawSymbol, - databento::SType::InstrumentId, limit, "ESZ2-ohlcv1m-20201003-20201004.dbn.zst"); - dbn_file_store.Replay([](const databento::Record record) { - const auto& ohlcv_bar = record.Get(); + db::DbnFileStore dbn_file_store = client.TimeseriesGetRangeToFile( + db::dataset::kGlbxMdp3, {"2022-10-03T00:00", "2022-10-04T00:00"}, {"ESZ2"}, + db::Schema::Ohlcv1M, db::SType::RawSymbol, db::SType::InstrumentId, limit, + "ESZ2-ohlcv1m-20201003-20201004.dbn.zst"); + dbn_file_store.Replay([](const db::Record record) { + const auto& ohlcv_bar = record.Get(); std::cout << ohlcv_bar << '\n'; - return databento::KeepGoing::Continue; + return db::KeepGoing::Continue; }); return 0; } diff --git a/examples/live/live_smoke_test.cpp b/examples/live/live_smoke_test.cpp index fc05090..7f4e93a 100644 --- a/examples/live/live_smoke_test.cpp +++ b/examples/live/live_smoke_test.cpp @@ -12,7 +12,7 @@ #include #include -using namespace databento; +namespace db = databento; std::vector SplitSymbols(const std::string& symbols) { std::vector result; @@ -26,26 +26,27 @@ std::vector SplitSymbols(const std::string& symbols) { return result; } -std::pair TryConvertToUnixNanos(const char* start) { +std::pair TryConvertToUnixNanos(const char* start) { std::size_t pos; const uint64_t result = std::stoul(start, &pos, 10); if (pos != std::strlen(start)) { - return std::make_pair(false, UnixNanos{}); + return std::make_pair(false, db::UnixNanos{}); } - return std::make_pair(true, UnixNanos{std::chrono::nanoseconds(result)}); + return std::make_pair(true, db::UnixNanos{std::chrono::nanoseconds(result)}); } -void ProcessRecords(LiveBlocking& client, Schema schema, bool start_from_epoch) { +void ProcessRecords(db::LiveBlocking& client, db::Schema schema, + bool start_from_epoch) { client.Start(); std::cout << "Starting client...\n"; // For start != 0 we stop at SymbolMappingMsg so that the tests can be run // outside trading hours - auto expected_rtype = Record::RTypeFromSchema(schema); + auto expected_rtype = db::Record::RTypeFromSchema(schema); if (!start_from_epoch) { - expected_rtype = databento::RType::SymbolMapping; + expected_rtype = db::RType::SymbolMapping; } constexpr auto timeout = std::chrono::seconds{30}; @@ -54,7 +55,7 @@ void ProcessRecords(LiveBlocking& client, Schema schema, bool start_from_epoch) if (record->RType() == expected_rtype) { std::cout << "Received expected record type " << expected_rtype << '\n'; break; - } else if (auto* msg = record->GetIf()) { + } else if (auto* msg = record->GetIf()) { std::stringstream ss; ss << "Received error " << msg->Err() << '\n'; std::cerr << ss.str(); @@ -65,26 +66,26 @@ void ProcessRecords(LiveBlocking& client, Schema schema, bool start_from_epoch) std::cout << "Finished client\n"; } -void ProcessSnapshotRecords(LiveBlocking& client, Schema schema) { +void ProcessSnapshotRecords(db::LiveBlocking& client, db::Schema schema) { client.Start(); std::cout << "Starting client...\n"; - const auto expected_rtype = Record::RTypeFromSchema(schema); + const auto expected_rtype = db::Record::RTypeFromSchema(schema); constexpr auto timeout = std::chrono::seconds{30}; auto received_snapshot_record = false; while (auto record = client.NextRecord(timeout)) { - if (auto* mbo_msg = record->GetIf()) { + if (auto* mbo_msg = record->GetIf()) { if (mbo_msg->flags.IsSnapshot()) { received_snapshot_record = true; } else { std::cout << "Received expected record type " << expected_rtype << '\n'; break; } - } else if (auto* error_msg = record->GetIf()) { + } else if (auto* error_msg = record->GetIf()) { std::stringstream ss; ss << "Received error " << error_msg->Err() << '\n'; throw std::runtime_error(ss.str()); @@ -137,9 +138,7 @@ class ArgParser { std::vector args; }; -ArgParser ParseArgs(int argc, char* argv[] - -) { +ArgParser ParseArgs(int argc, char* argv[]) { ArgParser parser; parser.Add(ArgParser::Arg{"gateway", "--gateway"}); parser.Add(ArgParser::Arg{"port", "--port", "13000"}); @@ -163,9 +162,9 @@ int main(int argc, char* argv[]) { const auto gateway = parser.Get("gateway"); const auto port = std::atoi(parser.Get("port")); const auto api_key_env_var = parser.Get("api_key_env_var"); - const auto dataset = FromString(parser.Get("dataset")); - const auto schema = FromString(parser.Get("schema")); - const auto stype = FromString(parser.Get("stype")); + const auto dataset = db::FromString(parser.Get("dataset")); + const auto schema = db::FromString(parser.Get("schema")); + const auto stype = db::FromString(parser.Get("stype")); const auto symbols = SplitSymbols(parser.Get("symbols")); const auto start = parser.Get("start"); const auto use_snapshot = std::atoi(parser.Get("use_snapshot")); @@ -173,7 +172,7 @@ int main(int argc, char* argv[]) { const auto api_key = std::getenv(api_key_env_var); assert(api_key); - auto client = LiveBuilder{} + auto client = db::LiveBlocking::Builder() .SetAddress(gateway, static_cast(port)) .SetKey(std::string{api_key}) .SetDataset(dataset) diff --git a/examples/live/readme.cpp b/examples/live/readme.cpp index e32fcea..abf3d9f 100644 --- a/examples/live/readme.cpp +++ b/examples/live/readme.cpp @@ -1,32 +1,32 @@ // Duplicate of the example usage code from the README.md to ensure // it compiles and to be able to clang-format it. -// NOLINTBEGIN(google-build-using-namespace) #include #include #include #include #include -using namespace databento; +namespace db = databento; int main() { - PitSymbolMap symbol_mappings; + db::PitSymbolMap symbol_mappings; - auto client = - LiveBuilder{}.SetKeyFromEnv().SetDataset(Dataset::GlbxMdp3).BuildThreaded(); + auto client = db::LiveThreaded::Builder() + .SetKeyFromEnv() + .SetDataset(db::Dataset::GlbxMdp3) + .BuildThreaded(); - auto handler = [&symbol_mappings](const Record& rec) { + auto handler = [&symbol_mappings](const db::Record& rec) { symbol_mappings.OnRecord(rec); - if (const auto* trade = rec.GetIf()) { + if (const auto* trade = rec.GetIf()) { std::cout << "Received trade for " << symbol_mappings[trade->hd.instrument_id] << ':' << *trade << '\n'; } - return KeepGoing::Continue; + return db::KeepGoing::Continue; }; - client.Subscribe({"ES.FUT"}, Schema::Trades, SType::Parent); + client.Subscribe({"ES.FUT"}, db::Schema::Trades, db::SType::Parent); client.Start(handler); std::this_thread::sleep_for(std::chrono::seconds{10}); return 0; } -// NOLINTEND(google-build-using-namespace) diff --git a/examples/live/simple.cpp b/examples/live/simple.cpp index 37a73fb..85f0316 100644 --- a/examples/live/simple.cpp +++ b/examples/live/simple.cpp @@ -12,60 +12,60 @@ #include #include +namespace db = databento; + static std::sig_atomic_t volatile gSignal; int main() { - databento::PitSymbolMap symbol_mappings; - auto log_receiver = - std::make_unique(databento::LogLevel::Debug); + db::PitSymbolMap symbol_mappings; + auto log_receiver = std::make_unique(db::LogLevel::Debug); - auto client = databento::LiveBuilder{} + auto client = db::LiveThreaded::Builder() .SetLogReceiver(log_receiver.get()) .SetSendTsOut(true) .SetKeyFromEnv() - .SetDataset(databento::Dataset::GlbxMdp3) + .SetDataset(db::Dataset::GlbxMdp3) .BuildThreaded(); // Set up signal handler for Ctrl+C std::signal(SIGINT, [](int signal) { gSignal = signal; }); std::vector symbols{"ESZ5", "ESZ5 C6200", "ESZ5 P5500"}; - client.Subscribe(symbols, databento::Schema::Definition, databento::SType::RawSymbol); - client.Subscribe(symbols, databento::Schema::Mbo, databento::SType::RawSymbol); + client.Subscribe(symbols, db::Schema::Definition, db::SType::RawSymbol); + client.Subscribe(symbols, db::Schema::Mbo, db::SType::RawSymbol); - auto metadata_callback = [](databento::Metadata&& metadata) { + auto metadata_callback = [](db::Metadata&& metadata) { std::cout << metadata << '\n'; }; - auto record_callback = [&symbol_mappings](const databento::Record& rec) { - using databento::RType; + auto record_callback = [&symbol_mappings](const db::Record& rec) { + using db::RType; switch (rec.RType()) { case RType::Mbo: { - auto ohlcv = rec.Get>(); + auto ohlcv = rec.Get>(); std::cout << "Received tick for " << symbol_mappings[ohlcv.rec.hd.instrument_id] << " with ts_out " << ohlcv.ts_out.time_since_epoch().count() << ": " << ohlcv.rec << '\n'; break; } case RType::InstrumentDef: { - std::cout << "Received definition: " << rec.Get() - << '\n'; + std::cout << "Received definition: " << rec.Get() << '\n'; break; } case RType::SymbolMapping: { - auto mapping = rec.Get(); + auto mapping = rec.Get(); symbol_mappings.OnSymbolMapping(mapping); break; } case RType::System: { - const auto& system_msg = rec.Get(); + const auto& system_msg = rec.Get(); if (!system_msg.IsHeartbeat()) { std::cout << "Received system msg: " << system_msg.Msg() << '\n'; } break; } case RType::Error: { - std::cerr << "Received error from gateway: " - << rec.Get().Err() << '\n'; + std::cerr << "Received error from gateway: " << rec.Get().Err() + << '\n'; break; } default: { @@ -73,7 +73,7 @@ int main() { << static_cast(rec.RType()) << '\n'; } } - return databento::KeepGoing::Continue; + return db::KeepGoing::Continue; }; client.Start(metadata_callback, record_callback); while (::gSignal == 0) { diff --git a/include/databento/enums.hpp b/include/databento/enums.hpp index 60c8dc8..46ded96 100644 --- a/include/databento/enums.hpp +++ b/include/databento/enums.hpp @@ -34,7 +34,6 @@ enum class Delivery : std::uint8_t { // The current state of a batch job. enum class JobState : std::uint8_t { - Received, Queued, Processing, Done, diff --git a/include/databento/flag_set.hpp b/include/databento/flag_set.hpp index e933412..8ba67b3 100644 --- a/include/databento/flag_set.hpp +++ b/include/databento/flag_set.hpp @@ -24,6 +24,8 @@ class FlagSet { static constexpr Repr kBadTsRecv = 1 << 3; // Indicates an unrecoverable gap was detected in the channel. static constexpr Repr kMaybeBadBook = 1 << 2; + // Indicates a publisher-specific event. + static constexpr Repr kPublisherSpecific = 1 << 1; friend std::ostream& operator<<(std::ostream&, FlagSet); @@ -77,11 +79,16 @@ class FlagSet { bits_.maybe_bad_book = true; return *this; } + constexpr bool IsPublisherSpecific() const { return bits_.publisher_specific; } + FlagSet SetPublisherSpecific() { + bits_.publisher_specific = true; + return *this; + } private: struct BitFlags { bool reserved0 : 1; - bool reserved1 : 1; + bool publisher_specific : 1; bool maybe_bad_book : 1; bool bad_ts_recv : 1; bool mbp : 1; diff --git a/include/databento/historical.hpp b/include/databento/historical.hpp index 6d117a1..7af9f22 100644 --- a/include/databento/historical.hpp +++ b/include/databento/historical.hpp @@ -23,6 +23,8 @@ class ILogReceiver; // A client for interfacing with Databento's historical market data API. class Historical { public: + static HistoricalBuilder Builder(); + // WARNING: Will be deprecated in the future in favor of the builder Historical(ILogReceiver* log_receiver, std::string key, HistoricalGateway gateway); diff --git a/include/databento/live_blocking.hpp b/include/databento/live_blocking.hpp index 572369d..5995c09 100644 --- a/include/databento/live_blocking.hpp +++ b/include/databento/live_blocking.hpp @@ -29,6 +29,8 @@ class LiveThreaded; // particular dataset. class LiveBlocking { public: + static LiveBuilder Builder(); + /* * Getters */ diff --git a/include/databento/live_threaded.hpp b/include/databento/live_threaded.hpp index 78f57b7..fb44ee0 100644 --- a/include/databento/live_threaded.hpp +++ b/include/databento/live_threaded.hpp @@ -35,6 +35,8 @@ class LiveThreaded { }; using ExceptionCallback = std::function; + static LiveBuilder Builder(); + LiveThreaded(const LiveThreaded&) = delete; LiveThreaded& operator=(const LiveThreaded&) = delete; LiveThreaded(LiveThreaded&& other) noexcept; diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index c378963..27752cb 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.40.0 +pkgver=0.41.0 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any') diff --git a/src/enums.cpp b/src/enums.cpp index 975dd6f..084b4aa 100644 --- a/src/enums.cpp +++ b/src/enums.cpp @@ -70,9 +70,6 @@ const char* ToString(Delivery delivery) { const char* ToString(JobState state) { switch (state) { - case JobState::Received: { - return "received"; - } case JobState::Queued: { return "queued"; } @@ -957,9 +954,6 @@ Delivery FromString(const std::string& str) { template <> JobState FromString(const std::string& str) { - if (str == "received") { - return JobState::Received; - } if (str == "queued") { return JobState::Queued; } diff --git a/src/flag_set.cpp b/src/flag_set.cpp index 1c16fb0..fd2a650 100644 --- a/src/flag_set.cpp +++ b/src/flag_set.cpp @@ -6,7 +6,7 @@ namespace databento { std::ostream& operator<<(std::ostream& stream, FlagSet flag_set) { - const std::array, 6> + const std::array, 7> kFlagsAndNames = {{ {&FlagSet::IsLast, "LAST"}, {&FlagSet::IsTob, "TOB"}, @@ -14,6 +14,7 @@ std::ostream& operator<<(std::ostream& stream, FlagSet flag_set) { {&FlagSet::IsMbp, "MBP"}, {&FlagSet::IsBadTsRecv, "BAD_TS_RECV"}, {&FlagSet::IsMaybeBadBook, "MAYBE_BAD_BOOK"}, + {&FlagSet::IsPublisherSpecific, "PUBLISHER_SPECIFIC"}, }}; bool has_written_flag = false; diff --git a/src/historical.cpp b/src/historical.cpp index 6432565..0ced860 100644 --- a/src/historical.cpp +++ b/src/historical.cpp @@ -121,6 +121,10 @@ void TryCreateDir(const std::filesystem::path& dir_name) { } } // namespace +databento::HistoricalBuilder Historical::Builder() { + return databento::HistoricalBuilder{}; +} + Historical::Historical(ILogReceiver* log_receiver, std::string key, HistoricalGateway gateway) : log_receiver_{log_receiver}, @@ -227,7 +231,7 @@ databento::BatchJob Historical::BatchSubmitJob(const httplib::Params& params) { std::vector Historical::BatchListJobs() { static const std::vector kDefaultStates = { - JobState::Received, JobState::Queued, JobState::Processing, JobState::Done}; + JobState::Queued, JobState::Processing, JobState::Done}; return this->BatchListJobs(kDefaultStates, UnixNanos{}); } std::vector Historical::BatchListJobs( diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index b271557..f7e2f01 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -16,6 +16,7 @@ #include "databento/dbn_decoder.hpp" #include "databento/detail/tcp_client.hpp" #include "databento/exceptions.hpp" // LiveApiError +#include "databento/live.hpp" // LiveBuilder #include "databento/log.hpp" // ILogReceiver #include "databento/record.hpp" // Record #include "databento/symbology.hpp" // JoinSymbolStrings @@ -27,6 +28,8 @@ namespace { constexpr std::size_t kBucketIdLength = 5; } // namespace +databento::LiveBuilder LiveBlocking::Builder() { return databento::LiveBuilder{}; } + LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, bool send_ts_out, VersionUpgradePolicy upgrade_policy, @@ -115,7 +118,7 @@ void LiveBlocking::SubscribeWithSnapshot(const std::vector& symbols void LiveBlocking::Subscribe(std::string_view sub_msg, const std::vector& symbols, bool use_snapshot) { - static constexpr auto kMethodName = "Live::Subscribe"; + static constexpr auto kMethodName = "LiveBlocking::Subscribe"; constexpr std::ptrdiff_t kSymbolMaxChunkSize = 500; if (symbols.empty()) { @@ -134,6 +137,12 @@ void LiveBlocking::Subscribe(std::string_view sub_msg, << "|snapshot=" << use_snapshot << "|is_last=" << (distance_from_end <= kSymbolMaxChunkSize) << '\n'; + if (log_receiver_->ShouldLog(LogLevel::Debug)) { + std::ostringstream log_ss; + log_ss << '[' << kMethodName + << "] Sending subscription request: " << chunked_sub_msg.str(); + log_receiver_->Receive(LogLevel::Debug, log_ss.str()); + } client_.WriteAll(chunked_sub_msg.str()); symbols_it += chunk_size; @@ -141,6 +150,8 @@ void LiveBlocking::Subscribe(std::string_view sub_msg, } databento::Metadata LiveBlocking::Start() { + log_receiver_->Receive(LogLevel::Info, "[LiveBlocking::Start] Starting session"); + client_.WriteAll("start_session\n"); client_.ReadExact(buffer_.WriteBegin(), kMetadataPreludeSize); buffer_.Fill(kMetadataPreludeSize); @@ -224,6 +235,7 @@ void LiveBlocking::Resubscribe() { } std::string LiveBlocking::DecodeChallenge() { + static constexpr auto kMethodName = "LiveBlocking::DecodeChallenge"; const auto read_size = client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size; if (read_size == 0) { @@ -233,15 +245,16 @@ std::string LiveBlocking::DecodeChallenge() { // first line is version std::string response{reinterpret_cast(buffer_.ReadBegin()), buffer_.ReadCapacity()}; - if (log_receiver_->ShouldLog(LogLevel::Debug)) { - std::ostringstream log_ss; - log_ss << "[LiveBlocking::DecodeChallenge] Challenge: " << response; - log_receiver_->Receive(LogLevel::Debug, log_ss.str()); - } auto first_nl_pos = response.find('\n'); if (first_nl_pos == std::string::npos) { throw LiveApiError::UnexpectedMsg("Received malformed initial message", response); } + if (log_receiver_->ShouldLog(LogLevel::Debug)) { + std::ostringstream log_ss; + log_ss << '[' << kMethodName + << "] Received greeting: " << response.substr(0, first_nl_pos); + log_receiver_->Receive(LogLevel::Debug, log_ss.str()); + } const auto find_start = first_nl_pos + 1; auto next_nl_pos = find_start == response.length() ? std::string::npos : response.find('\n', find_start); @@ -257,6 +270,11 @@ std::string LiveBlocking::DecodeChallenge() { next_nl_pos = response.find('\n', find_start); } const auto challenge_line = response.substr(find_start, next_nl_pos - find_start); + if (log_receiver_->ShouldLog(LogLevel::Debug)) { + std::ostringstream log_ss; + log_ss << '[' << kMethodName << "] Received CRAM challenge: " << challenge_line; + log_receiver_->Receive(LogLevel::Debug, log_ss.str()); + } if (challenge_line.compare(0, 4, "cram") != 0) { throw LiveApiError::UnexpectedMsg("Did not receive CRAM challenge when expected", challenge_line); @@ -279,17 +297,22 @@ std::string LiveBlocking::DetermineGateway() const { } std::uint64_t LiveBlocking::Authenticate() { + static constexpr auto kMethodName = "LiveBlocking::Authenticate"; const std::string challenge_key = DecodeChallenge() + '|' + key_; const std::string auth = GenerateCramReply(challenge_key); const std::string req = EncodeAuthReq(auth); + if (log_receiver_->ShouldLog(LogLevel::Debug)) { + std::ostringstream log_ss; + log_ss << '[' << kMethodName << "] Sending CRAM reply: " << req; + log_receiver_->Receive(LogLevel::Debug, log_ss.str()); + } client_.WriteAll(req); const std::uint64_t session_id = DecodeAuthResp(); if (log_receiver_->ShouldLog(LogLevel::Info)) { std::ostringstream log_ss; - log_ss << "[LiveBlocking::Authenticate] Successfully authenticated with " - "session_id " + log_ss << '[' << kMethodName << "] Successfully authenticated with session_id " << session_id; log_receiver_->Receive(LogLevel::Info, log_ss.str()); } diff --git a/src/live_threaded.cpp b/src/live_threaded.cpp index 86b231c..9dc5824 100644 --- a/src/live_threaded.cpp +++ b/src/live_threaded.cpp @@ -10,8 +10,9 @@ #include // forward, move, swap #include "databento/detail/scoped_thread.hpp" // ScopedThread +#include "databento/live.hpp" // LiveBuilder #include "databento/live_blocking.hpp" // LiveBlocking -#include "databento/log.hpp" +#include "databento/log.hpp" // ILogReceiver using databento::LiveThreaded; @@ -36,6 +37,8 @@ struct LiveThreaded::Impl { LiveBlocking blocking; }; +databento::LiveBuilder LiveThreaded::Builder() { return databento::LiveBuilder{}; } + LiveThreaded::LiveThreaded(LiveThreaded&& other) noexcept : impl_{std::move(other.impl_)}, thread_{std::move(other.thread_)} {} diff --git a/src/log.cpp b/src/log.cpp index 883c236..e244575 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -27,7 +27,11 @@ ConsoleLogReceiver::ConsoleLogReceiver(LogLevel min_level, std::ostream& stream) void ConsoleLogReceiver::Receive(LogLevel level, const std::string& msg) { if (ShouldLog(level)) { - stream_ << level << ": " << msg << '\n'; + stream_ << level << ": " << msg; + // Don't add a newline if `msg` ends in one + if (msg.empty() || msg.back() != '\n') { + stream_ << '\n'; + } } } diff --git a/tests/src/flag_set_tests.cpp b/tests/src/flag_set_tests.cpp index d578566..211befd 100644 --- a/tests/src/flag_set_tests.cpp +++ b/tests/src/flag_set_tests.cpp @@ -46,7 +46,8 @@ TEST(FlagSetTests, ToStringThreeSet) { TEST(FlagSetTests, ToStringReservedSet) { constexpr FlagSet kTarget{255}; ASSERT_EQ(ToString(kTarget), - "LAST | TOB | SNAPSHOT | MBP | BAD_TS_RECV | MAYBE_BAD_BOOK (255)"); + "LAST | TOB | SNAPSHOT | MBP | BAD_TS_RECV | MAYBE_BAD_BOOK | " + "PUBLISHER_SPECIFIC (255)"); } TEST(FlagSetTests, ConstantBitFieldEquivalence) {