Skip to content

Commit d58461d

Browse files
authored
Merge pull request #95 from trubrics/feature/flush_limit
Added hard limit to flush in SDK
2 parents 69557e4 + aa92d8e commit d58461d

File tree

6 files changed

+76
-20
lines changed

6 files changed

+76
-20
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [1.8.2] - 2025-02-10
11+
### Fixed
12+
- Added missing flushing limit values, and setup automated flush if limit is reached
13+
1014

1115
## [1.8.1] - 2025-02-07
1216
### Fixed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ client = Trubrics(
7979
```
8080

8181
- flush_interval: Time in seconds between automatic flushes (default: 10)
82-
- flush_at: Number of events that trigger a flush (default: 20)
82+
- flush_batch_size: Number of events that trigger a flush (default: 20)
8383

8484
## Development
8585

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "trubrics"
3-
version = "1.8.1"
3+
version = "1.8.2"
44
requires-python = ">=3.10"
55
license = { text = "Apache 2.0" }
66
authors = [

trubrics/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
MIN_FLUSH_INTERVAL = 1
2+
MAX_FLUSH_BATCH_SIZE = 100
3+
DEFAULT_FLUSH_INTERVAL = 10
4+
DEFAULT_FLUSH_BATCH_SIZE = 20
5+
DEFAULT_FLUSH_PERIODIC_CHECK = 0.1

trubrics/logger.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@
88
)
99
handler.setFormatter(formatter)
1010
trubrics_logger.addHandler(handler)
11-
trubrics_logger.setLevel(logging.ERROR)
11+
trubrics_logger.setLevel(logging.INFO)

trubrics/main.py

Lines changed: 64 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@
66

77
import requests
88

9+
from trubrics.config import (
10+
DEFAULT_FLUSH_BATCH_SIZE,
11+
DEFAULT_FLUSH_INTERVAL,
12+
DEFAULT_FLUSH_PERIODIC_CHECK,
13+
MAX_FLUSH_BATCH_SIZE,
14+
MIN_FLUSH_INTERVAL,
15+
)
916
from trubrics.logger import trubrics_logger
1017

1118

@@ -14,22 +21,60 @@ def __init__(
1421
self,
1522
api_key: str,
1623
host: str = "https://app.trubrics.com/api/ingestion",
17-
flush_interval: int = 10,
18-
flush_at: int = 20,
24+
flush_interval: float = DEFAULT_FLUSH_INTERVAL,
25+
flush_batch_size: int = DEFAULT_FLUSH_BATCH_SIZE,
26+
flush_periodic_check: float = DEFAULT_FLUSH_PERIODIC_CHECK,
1927
logger: logging.Logger = trubrics_logger,
2028
):
29+
f"""
30+
Initialize the Trubrics client.
31+
Args:
32+
api_key (str): The API key for the Trubrics account.
33+
host (str): The host URL for the Trubrics API.
34+
flush_interval (int): The interval in seconds after which events should be flushed. Minimum possible value is {MIN_FLUSH_INTERVAL}.
35+
flush_batch_size (int): The number of events to flush at a time. Max possible value is {MAX_FLUSH_BATCH_SIZE}.
36+
flush_periodic_check (float): The interval in seconds between periodic flush checks.
37+
logger (logging.Logger): The logger to use for logging.
38+
39+
"""
2140
self.host = host
2241
self.api_key = api_key
2342
self.queue: list[dict] = []
24-
self.flush_interval = flush_interval
25-
self.flush_at = flush_at
2643
self.last_flush_time = datetime.now(timezone.utc)
2744
self.is_flushing = False
2845
self._lock = threading.Lock()
2946
self._stop_event = threading.Event()
47+
self.logger = logger
48+
49+
self._init_flush_parameters(
50+
flush_interval, flush_batch_size, flush_periodic_check
51+
)
3052
self._thread = threading.Thread(target=self._periodic_flush, daemon=True)
3153
self._thread.start()
32-
self.logger = logger
54+
55+
def _init_flush_parameters(
56+
self, flush_interval: float, flush_batch_size: int, flush_periodic_check: float
57+
):
58+
if flush_interval < MIN_FLUSH_INTERVAL:
59+
self.logger.warning(
60+
f"Flush interval {flush_interval} is too low. Setting it to minimum allowed value of {MIN_FLUSH_INTERVAL}."
61+
)
62+
flush_interval = MIN_FLUSH_INTERVAL
63+
if flush_batch_size > MAX_FLUSH_BATCH_SIZE:
64+
self.logger.warning(
65+
f"Flush batch size {flush_batch_size} is too high. Setting to maximum allowed value of {MAX_FLUSH_BATCH_SIZE}."
66+
)
67+
flush_batch_size = MAX_FLUSH_BATCH_SIZE
68+
if flush_periodic_check > flush_interval:
69+
self.logger.warning(
70+
f"Periodic flush check interval {flush_periodic_check} is higher than defined \
71+
flush interval period. Setting it to Flush interval period {flush_interval}."
72+
)
73+
flush_periodic_check = flush_interval
74+
75+
self.flush_interval = flush_interval
76+
self.flush_batch_size = flush_batch_size
77+
self.flush_periodic_check = flush_periodic_check
3378

3479
def track(
3580
self,
@@ -47,10 +92,6 @@ def track(
4792
timestamp (datetime | None): The timestamp of event. If None, the current time in UTC is used. If not a datetime object, the event is ignored.
4893
"""
4994

50-
if timestamp and not isinstance(timestamp, datetime):
51-
self.logger.error("Timestamp must be a datetime object. Ignoring event.")
52-
return
53-
5495
event_dict = {
5596
"user_id": user_id,
5697
"event": event,
@@ -124,14 +165,19 @@ def flush(self):
124165
self.queue.clear()
125166

126167
if events:
127-
success = self._post(events)
128-
self.last_flush_time = datetime.now(timezone.utc)
168+
for batch_id in range(0, len(events), self.flush_batch_size):
169+
batch = events[batch_id : batch_id + self.flush_batch_size]
170+
success = self._post(batch)
129171

130-
if not success:
131-
self.logger.info(f"Retrying flush of {queue_len} events.")
132-
time.sleep(5)
172+
if not success:
173+
self.logger.warning(
174+
f"Retrying flush of batch {batch_id} of {len(batch)} events."
175+
)
176+
time.sleep(5)
177+
self._post(batch)
133178

134-
self._post(events)
179+
self.last_flush_time = datetime.now(timezone.utc)
180+
self.logger.info(f"Flush of {len(events)} events completed.")
135181

136182
with self._lock:
137183
self.is_flushing = False
@@ -177,13 +223,14 @@ def _post(self, events: list[dict]):
177223

178224
def _periodic_flush(self):
179225
while not self._stop_event.is_set():
180-
time.sleep(1)
226+
time.sleep(self.flush_periodic_check)
181227

182228
queue_len = len(self.queue)
183229
now = datetime.now(timezone.utc)
184230
time_since_last_flush = (now - self.last_flush_time).total_seconds()
185231
if (
186-
queue_len >= self.flush_at
232+
queue_len >= self.flush_batch_size
187233
or time_since_last_flush >= self.flush_interval
188234
):
235+
self.logger.debug(f"Time since last flush: {time_since_last_flush}")
189236
self.flush()

0 commit comments

Comments
 (0)