Skip to content

Commit 6fb6f55

Browse files
committed
ADD: Add configurable buffer size for live clients
1 parent 3440981 commit 6fb6f55

File tree

12 files changed

+482
-58
lines changed

12 files changed

+482
-58
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33
## 0.38.0 - TBD
44

55
### Enhancements
6+
- Made the buffer size used by the live clients when reading from the TCP socket
7+
configurable through the `LiveBuilder::SetBufferSize()` method
68
- Added log level prefix to `ConsoleLogReceiver` output
79

810
### Breaking changes
11+
- Live client instances can only be created through the `LiveBuilder` class
12+
- Changed `HeartbeatInterval()` getters on `LiveBlocking` and `LiveThreaded` to return
13+
an `std::optional`
914
- Added new optional `ShouldLog` virtual method to `ILogReceiver` to
1015
filter the levels of log messages that will be sent to the receiver
1116

include/databento/detail/buffer.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
namespace databento::detail {
1212
class Buffer : public IReadable, public IWritable {
1313
public:
14-
Buffer() : Buffer(64 * std::size_t{1 << 10}) {}
14+
static constexpr std::size_t kDefaultBufSize = 64 * std::size_t{1 << 10};
15+
16+
Buffer() : Buffer(kDefaultBufSize) {}
1517
explicit Buffer(std::size_t init_capacity)
1618
: buf_{AlignedNew(init_capacity), AlignedDelete},
1719
end_{buf_.get() + init_capacity},

include/databento/live.hpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <chrono>
4+
#include <cstddef>
45
#include <string>
56

67
#include "databento/enums.hpp" // VersionUpgradePolicy
@@ -9,22 +10,32 @@
910
#include "databento/publishers.hpp"
1011

1112
namespace databento {
13+
// Forward declarations
1214
class ILogReceiver;
1315

1416
// A helper class for constructing a Live client, either an instance of
1517
// LiveBlocking or LiveThreaded.
1618
class LiveBuilder {
1719
public:
18-
LiveBuilder() = default;
20+
LiveBuilder();
21+
22+
/*
23+
* Required settters
24+
*/
1925

2026
// Sets `key_` based on the environment variable DATABENTO_API_KEY.
2127
//
2228
// NOTE: This is not thread-safe if `std::setenv` is used elsewhere in the
2329
// program.
2430
LiveBuilder& SetKeyFromEnv();
2531
LiveBuilder& SetKey(std::string key);
26-
LiveBuilder& SetDataset(std::string dataset);
2732
LiveBuilder& SetDataset(Dataset dataset);
33+
LiveBuilder& SetDataset(std::string dataset);
34+
35+
/*
36+
* Optional settters
37+
*/
38+
2839
// Whether to append the gateway send timestamp after each DBN message.
2940
LiveBuilder& SetSendTsOut(bool send_ts_out);
3041
// Set the version upgrade policy for when receiving DBN data from a prior
@@ -36,6 +47,13 @@ class LiveBuilder {
3647
LiveBuilder& SetHeartbeatInterval(std::chrono::seconds heartbeat_interval);
3748
// Overrides the gateway and port. This is an advanced method.
3849
LiveBuilder& SetAddress(std::string gateway, std::uint16_t port);
50+
// Overrides the size of the buffer used for reading data from the TCP socket.
51+
LiveBuilder& SetBufferSize(std::size_t size);
52+
53+
/*
54+
* Build a live client instance
55+
*/
56+
3957
// Attempts to construct an instance of a blocking live client or throws an
4058
// exception.
4159
LiveBlocking BuildBlocking();
@@ -51,8 +69,10 @@ class LiveBuilder {
5169
std::uint16_t port_{};
5270
std::string key_;
5371
std::string dataset_;
72+
5473
bool send_ts_out_{false};
5574
VersionUpgradePolicy upgrade_policy_{VersionUpgradePolicy::UpgradeToV3};
56-
std::chrono::seconds heartbeat_interval_{};
75+
std::optional<std::chrono::seconds> heartbeat_interval_{};
76+
std::size_t buffer_size_;
5777
};
5878
} // namespace databento

include/databento/live_blocking.hpp

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <chrono> // milliseconds
55
#include <cstddef>
66
#include <cstdint>
7+
#include <optional>
78
#include <string>
89
#include <string_view>
910
#include <utility> // pair
@@ -18,21 +19,17 @@
1819
#include "databento/record.hpp" // Record, RecordHeader
1920

2021
namespace databento {
22+
// Forward declaration
2123
class ILogReceiver;
24+
class LiveBuilder;
25+
class LiveThreaded;
2226

2327
// A client for interfacing with Databento's real-time and intraday replay
2428
// market data API. This client provides a blocking API for getting the next
2529
// record. Unlike Historical, each instance of LiveBlocking is associated with a
2630
// particular dataset.
2731
class LiveBlocking {
2832
public:
29-
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
30-
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
31-
std::chrono::seconds heartbeat_interval);
32-
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
33-
std::string gateway, std::uint16_t port, bool send_ts_out,
34-
VersionUpgradePolicy upgrade_policy,
35-
std::chrono::seconds heartbeat_interval);
3633
/*
3734
* Getters
3835
*/
@@ -45,8 +42,8 @@ class LiveBlocking {
4542
VersionUpgradePolicy UpgradePolicy() const { return upgrade_policy_; }
4643
// The the first member of the pair will be true, when the heartbeat interval
4744
// was overridden.
48-
std::pair<bool, std::chrono::seconds> HeartbeatInterval() const {
49-
return {heartbeat_interval_.count() > 0, heartbeat_interval_};
45+
std::optional<std::chrono::seconds> HeartbeatInterval() const {
46+
return heartbeat_interval_;
5047
}
5148
const std::vector<LiveSubscription>& Subscriptions() const {
5249
return subscriptions_;
@@ -93,6 +90,19 @@ class LiveBlocking {
9390
void Resubscribe();
9491

9592
private:
93+
friend LiveBuilder;
94+
friend LiveThreaded;
95+
96+
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
97+
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
98+
std::optional<std::chrono::seconds> heartbeat_interval,
99+
std::size_t buffer_size);
100+
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
101+
std::string gateway, std::uint16_t port, bool send_ts_out,
102+
VersionUpgradePolicy upgrade_policy,
103+
std::optional<std::chrono::seconds> heartbeat_interval,
104+
std::size_t buffer_size);
105+
96106
std::string DetermineGateway() const;
97107
std::uint64_t Authenticate();
98108
std::string DecodeChallenge();
@@ -115,11 +125,11 @@ class LiveBlocking {
115125
bool send_ts_out_;
116126
std::uint8_t version_{};
117127
VersionUpgradePolicy upgrade_policy_;
118-
std::chrono::seconds heartbeat_interval_;
128+
std::optional<std::chrono::seconds> heartbeat_interval_;
119129
detail::TcpClient client_;
120130
std::uint32_t sub_counter_{};
121131
std::vector<LiveSubscription> subscriptions_;
122-
detail::Buffer buffer_{};
132+
detail::Buffer buffer_;
123133
// Must be 8-byte aligned for records
124134
alignas(RecordHeader) std::array<std::byte, kMaxRecordLen> compat_buffer_{};
125135
std::uint64_t session_id_;

include/databento/live_threaded.hpp

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
#pragma once
22

33
#include <chrono>
4+
#include <cstdint>
45
#include <functional> // function
56
#include <memory> // unique_ptr
7+
#include <optional>
68
#include <string>
9+
#include <string_view>
710
#include <utility> // pair
811
#include <vector>
912

@@ -14,15 +17,17 @@
1417
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback
1518

1619
namespace databento {
20+
// Forward declaration
1721
class ILogReceiver;
22+
class LiveBuilder;
1823

1924
// A client for interfacing with Databento's real-time and intraday replay
2025
// market data API. This client provides a threaded event-driven API for
2126
// receiving the next record. Unlike Historical, each instance of LiveThreaded
2227
// is associated with a particular dataset.
2328
class LiveThreaded {
2429
public:
25-
enum class ExceptionAction {
30+
enum class ExceptionAction : std::uint8_t {
2631
// Start a new session. Return this instead of calling `Start`, which would
2732
// cause a deadlock.
2833
Restart,
@@ -32,13 +37,6 @@ class LiveThreaded {
3237
using ExceptionCallback =
3338
std::function<ExceptionAction(const std::exception&)>;
3439

35-
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
36-
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
37-
std::chrono::seconds heartbeat_interval);
38-
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
39-
std::string gateway, std::uint16_t port, bool send_ts_out,
40-
VersionUpgradePolicy upgrade_policy,
41-
std::chrono::seconds heartbeat_interval);
4240
LiveThreaded(const LiveThreaded&) = delete;
4341
LiveThreaded& operator=(const LiveThreaded&) = delete;
4442
LiveThreaded(LiveThreaded&& other) noexcept;
@@ -57,7 +55,7 @@ class LiveThreaded {
5755
VersionUpgradePolicy UpgradePolicy() const;
5856
// The the first member of the pair will be true, when the heartbeat interval
5957
// was overridden.
60-
std::pair<bool, std::chrono::seconds> HeartbeatInterval() const;
58+
std::optional<std::chrono::seconds> HeartbeatInterval() const;
6159
const std::vector<LiveSubscription>& Subscriptions() const;
6260
std::vector<LiveSubscription>& Subscriptions();
6361

@@ -96,15 +94,27 @@ class LiveThreaded {
9694
KeepGoing BlockForStop(std::chrono::milliseconds timeout);
9795

9896
private:
97+
friend LiveBuilder;
98+
9999
struct Impl;
100100

101101
static void ProcessingThread(Impl* impl, MetadataCallback&& metadata_callback,
102102
RecordCallback&& record_callback,
103103
ExceptionCallback&& exception_callback);
104104
static ExceptionAction ExceptionHandler(
105105
Impl* impl, const ExceptionCallback& exception_callback,
106-
const std::exception& exc, const char* pretty_function_name,
107-
const char* message);
106+
const std::exception& exc, std::string_view pretty_function_name,
107+
std::string_view message);
108+
109+
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
110+
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
111+
std::optional<std::chrono::seconds> heartbeat_interval,
112+
std::size_t buffer_size);
113+
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
114+
std::string gateway, std::uint16_t port, bool send_ts_out,
115+
VersionUpgradePolicy upgrade_policy,
116+
std::optional<std::chrono::seconds> heartbeat_interval,
117+
std::size_t buffer_size);
108118

109119
// unique_ptr to be movable
110120
std::unique_ptr<Impl> impl_;

live.hpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#pragma once
2+
3+
#include <chrono>
4+
#include <cstddef>
5+
#include <string>
6+
7+
#include "databento/enums.hpp" // VersionUpgradePolicy
8+
#include "databento/live_blocking.hpp"
9+
#include "databento/live_threaded.hpp"
10+
#include "databento/publishers.hpp"
11+
12+
namespace databento {
13+
// Forward declarations
14+
class ILogReceiver;
15+
16+
// A helper class for constructing a Live client, either an instance of
17+
// LiveBlocking or LiveThreaded.
18+
class LiveBuilder {
19+
public:
20+
LiveBuilder();
21+
22+
/*
23+
* Required settters
24+
*/
25+
26+
// Sets `key_` based on the environment variable DATABENTO_API_KEY.
27+
//
28+
// NOTE: This is not thread-safe if `std::setenv` is used elsewhere in the
29+
// program.
30+
LiveBuilder& SetKeyFromEnv();
31+
LiveBuilder& SetKey(std::string key);
32+
LiveBuilder& SetDataset(Dataset dataset);
33+
LiveBuilder& SetDataset(std::string dataset);
34+
35+
/*
36+
* Optional settters
37+
*/
38+
39+
// Whether to append the gateway send timestamp after each DBN message.
40+
LiveBuilder& SetSendTsOut(bool send_ts_out);
41+
// Set the version upgrade policy for when receiving DBN data from a prior
42+
// version. Defaults to upgrading to DBNv2 (if not already).
43+
LiveBuilder& SetUpgradePolicy(VersionUpgradePolicy upgrade_policy);
44+
// Sets the receiver of the logs to be used by the client.
45+
LiveBuilder& SetLogReceiver(ILogReceiver* log_receiver);
46+
// Overrides the heartbeat interval.
47+
LiveBuilder& SetHeartbeatInterval(std::chrono::seconds heartbeat_interval);
48+
// Overrides the gateway and port. This is an advanced method.
49+
LiveBuilder& SetAddress(std::string gateway, std::uint16_t port);
50+
// Overrides the size of the buffer used for reading data from the TCP socket.
51+
LiveBuilder& SetBufferSize(std::size_t size);
52+
53+
/*
54+
* Build a live client instance
55+
*/
56+
57+
// Attempts to construct an instance of a blocking live client or throws an
58+
// exception.
59+
LiveBlocking BuildBlocking();
60+
// Attempts to construct an instance of a threaded live client or throws an
61+
// exception.
62+
LiveThreaded BuildThreaded();
63+
64+
private:
65+
void Validate();
66+
67+
ILogReceiver* log_receiver_{};
68+
std::string gateway_{};
69+
std::uint16_t port_{};
70+
std::string key_;
71+
std::string dataset_;
72+
73+
bool send_ts_out_{false};
74+
VersionUpgradePolicy upgrade_policy_{VersionUpgradePolicy::UpgradeToV3};
75+
std::optional<std::chrono::seconds> heartbeat_interval_{};
76+
std::size_t buffer_size_;
77+
};
78+
} // namespace databento

0 commit comments

Comments
 (0)