1+ import random
2+ import threading
3+ import time
14from collections import defaultdict
25from datetime import datetime , timezone
36import os
4- import threading
57from typing import TYPE_CHECKING
68import weakref
79
1618
1719class SpanBatcher (Batcher ["StreamedSpan" ]):
1820 # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
19- # a bit of a buffer for spans that appear between setting the flush event
21+ # a bit of a buffer for spans that appear between the trigger to flush
2022 # and actually flushing the buffer.
2123 #
22- # The max limits are all per trace.
24+ # The max limits are all per trace (per bucket) .
2325 MAX_ENVELOPE_SIZE = 1000 # spans
2426 MAX_BEFORE_FLUSH = 1000
2527 MAX_BEFORE_DROP = 2000
2628 MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB
29+
2730 FLUSH_WAIT_TIME = 5.0
2831
2932 TYPE = "span"
@@ -47,7 +50,9 @@ def __init__(
4750 self ._lock = threading .Lock ()
4851 self ._active : "threading.local" = threading .local ()
4952
50- self ._flush_event : "threading.Event" = threading .Event ()
53+ self ._last_full_flush : float = time .monotonic () # drives time-based flushes
54+ self ._flush_event = threading .Event ()
55+ self ._pending_flush : set [str ] = set () # buckets to be flushed
5156
5257 self ._flusher : "Optional[threading.Thread]" = None
5358 self ._flusher_pid : "Optional[int]" = None
@@ -71,11 +76,29 @@ def _reset_thread_state(self) -> None:
7176 self ._lock = threading .Lock ()
7277 self ._active = threading .local ()
7378
79+ self ._last_full_flush = time .monotonic ()
7480 self ._flush_event = threading .Event ()
81+ self ._pending_flush = set ()
7582
7683 self ._flusher = None
7784 self ._flusher_pid = None
7885
86+ def _flush_loop (self ) -> None :
87+ self ._active .flag = True
88+ while self ._running :
89+ jitter = random .random () * self .FLUSH_WAIT_TIME * 0.1
90+ self ._flush_event .wait (timeout = self .FLUSH_WAIT_TIME + jitter )
91+ self ._flush_event .clear ()
92+
93+ self ._flush (only_pending = True )
94+
95+ if (
96+ time .monotonic () - self ._last_full_flush
97+ >= self .FLUSH_WAIT_TIME + jitter
98+ ):
99+ self ._flush ()
100+ self ._last_full_flush = time .monotonic ()
101+
79102 def add (self , span : "StreamedSpan" ) -> None :
80103 # Bail out if the current thread is already executing batcher code.
81104 # This prevents deadlocks when code running inside the batcher (e.g.
@@ -104,13 +127,17 @@ def add(self, span: "StreamedSpan") -> None:
104127 self ._span_buffer [span .trace_id ].append (span )
105128 self ._running_size [span .trace_id ] += self ._estimate_size (span )
106129
107- if size + 1 >= self .MAX_BEFORE_FLUSH :
108- self ._flush_event .set ()
109- return
110-
111- if self ._running_size [span .trace_id ] >= self .MAX_BYTES_BEFORE_FLUSH :
112- self ._flush_event .set ()
113- return
130+ if (
131+ size + 1 >= self .MAX_BEFORE_FLUSH
132+ or self ._running_size [span .trace_id ] >= self .MAX_BYTES_BEFORE_FLUSH
133+ ):
134+ self ._pending_flush .add (span .trace_id )
135+ notify = True
136+ else :
137+ notify = False
138+
139+ if notify :
140+ self ._flush_event .set ()
114141 finally :
115142 self ._active .flag = False
116143
@@ -154,50 +181,62 @@ def _to_transport_format(item: "StreamedSpan") -> "Any":
154181
155182 return res
156183
157- def _flush (self ) -> None :
184+ def _flush (self , only_pending : bool = False ) -> None :
158185 with self ._lock :
159- if len (self ._span_buffer ) == 0 :
186+ if only_pending :
187+ buckets = list (self ._pending_flush )
188+ else :
189+ # flush whole buffer, e.g. if the SDK is shutting down
190+ buckets = list (self ._span_buffer .keys ())
191+
192+ self ._pending_flush .clear ()
193+
194+ if not buckets :
160195 return
161196
162197 envelopes = []
163- for spans in self ._span_buffer .values ():
164- if spans :
165- dsc = spans [0 ]._dynamic_sampling_context ()
166198
167- # Max per envelope is 1000, so if we happen to have more than
168- # 1000 spans in one bucket, we'll need to separate them.
169- for start in range ( 0 , len ( spans ), self . MAX_ENVELOPE_SIZE ) :
170- end = min ( start + self . MAX_ENVELOPE_SIZE , len ( spans ))
199+ for bucket_id in buckets :
200+ spans = self . _span_buffer . get ( bucket_id )
201+ if not spans :
202+ continue
171203
172- envelope = Envelope (
173- headers = {
174- "sent_at" : format_timestamp ( datetime . now ( timezone . utc )),
175- "trace" : dsc ,
176- }
177- )
204+ dsc = spans [ 0 ]. _dynamic_sampling_context ()
205+
206+ # Max per envelope is 1000, so if we happen to have more than
207+ # 1000 spans in one bucket, we'll need to separate them.
208+ for start in range ( 0 , len ( spans ), self . MAX_ENVELOPE_SIZE ):
209+ end = min ( start + self . MAX_ENVELOPE_SIZE , len ( spans ) )
178210
179- envelope .add_item (
180- Item (
181- type = self .TYPE ,
182- content_type = self .CONTENT_TYPE ,
183- headers = {
184- "item_count" : end - start ,
185- },
186- payload = PayloadRef (
187- json = {
188- "items" : [
189- self ._to_transport_format (spans [j ])
190- for j in range (start , end )
191- ]
192- }
193- ),
194- )
211+ envelope = Envelope (
212+ headers = {
213+ "sent_at" : format_timestamp (datetime .now (timezone .utc )),
214+ "trace" : dsc ,
215+ }
216+ )
217+
218+ envelope .add_item (
219+ Item (
220+ type = self .TYPE ,
221+ content_type = self .CONTENT_TYPE ,
222+ headers = {
223+ "item_count" : end - start ,
224+ },
225+ payload = PayloadRef (
226+ json = {
227+ "items" : [
228+ self ._to_transport_format (spans [j ])
229+ for j in range (start , end )
230+ ]
231+ }
232+ ),
195233 )
234+ )
196235
197- envelopes .append (envelope )
236+ envelopes .append (envelope )
198237
199- self ._span_buffer . clear ()
200- self ._running_size . clear ()
238+ del self ._span_buffer [ bucket_id ]
239+ del self ._running_size [ bucket_id ]
201240
202241 for envelope in envelopes :
203242 self ._capture_func (envelope )
0 commit comments