Skip to content

Commit f4097d5

Browse files
authored
Merge pull request #3 from LogFlow-AI/feature/improve-ddsketch-efficiency
Feature/improve ddsketch efficiency
2 parents 74bbb16 + 8f8f23f commit f4097d5

13 files changed

Lines changed: 1134 additions & 188 deletions

File tree

QuantileFlow/ddsketch/core.py

Lines changed: 91 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
"""Core DDSketch implementation."""
1+
"""Core DDSketch implementation.
2+
3+
Optimized for high throughput with efficient bucket indexing and quantile queries.
4+
"""
25

36
from typing import Literal, Union
47
from .mapping.logarithmic import LogarithmicMapping
@@ -8,6 +11,7 @@
811
from .storage.contiguous import ContiguousStorage
912
from .storage.sparse import SparseStorage
1013

14+
1115
class DDSketch:
1216
"""
1317
DDSketch implementation for quantile approximation with relative-error guarantees.
@@ -21,6 +25,9 @@ class DDSketch:
2125
by Charles Masson, Jee E. Rim and Homin K. Lee
2226
"""
2327

28+
__slots__ = ('relative_accuracy', 'cont_neg', 'mapping', 'positive_store',
29+
'negative_store', 'count', 'zero_count', '_min', '_max', '_sum')
30+
2431
def __init__(
2532
self,
2633
relative_accuracy: float,
@@ -54,7 +61,6 @@ def __init__(
5461
self.relative_accuracy = relative_accuracy
5562
self.cont_neg = cont_neg
5663

57-
5864
# Initialize mapping scheme
5965
if mapping_type == 'logarithmic':
6066
self.mapping = LogarithmicMapping(relative_accuracy)
@@ -71,31 +77,53 @@ def __init__(
7177
self.positive_store = SparseStorage(strategy=bucket_strategy)
7278
self.negative_store = SparseStorage(strategy=bucket_strategy) if cont_neg else None
7379

74-
self.count = 0
75-
self.zero_count = 0
80+
self.count = 0.0
81+
self.zero_count = 0.0
82+
83+
# Summary stats
84+
self._min = float('+inf')
85+
self._max = float('-inf')
86+
self._sum = 0.0
7687

77-
def insert(self, value: Union[int, float]) -> None:
88+
def insert(self, value: Union[int, float], weight: float = 1.0) -> None:
7889
"""
7990
Insert a value into the sketch.
8091
8192
Args:
8293
value: The value to insert.
94+
weight: The weight of the value (default 1.0).
8395
8496
Raises:
8597
ValueError: If value is negative and cont_neg is False.
8698
"""
99+
# Cache method lookups for hot path optimization
87100
if value > 0:
88-
bucket_idx = self.mapping.compute_bucket_index(value)
89-
self.positive_store.add(bucket_idx)
101+
# Most common case: positive values
102+
# Inline the hot path with cached local references
103+
compute_idx = self.mapping.compute_bucket_index
104+
self.positive_store.add(compute_idx(value), weight)
90105
elif value < 0:
91106
if self.cont_neg:
92-
bucket_idx = self.mapping.compute_bucket_index(-value)
93-
self.negative_store.add(bucket_idx)
107+
compute_idx = self.mapping.compute_bucket_index
108+
self.negative_store.add(compute_idx(-value), weight)
94109
else:
95110
raise ValueError("Negative values not supported when cont_neg is False")
96111
else:
97-
self.zero_count += 1
98-
self.count += 1
112+
self.zero_count += weight
113+
114+
# Track summary stats - combined update
115+
self.count += weight
116+
self._sum += value * weight
117+
# Update min/max - use local to avoid repeated attribute access
118+
if value < self._min:
119+
self._min = value
120+
if value > self._max:
121+
self._max = value
122+
123+
# Alias for API compatibility
124+
def add(self, value: Union[int, float], weight: float = 1.0) -> None:
125+
"""Alias for insert()."""
126+
self.insert(value, weight)
99127

100128
def delete(self, value: Union[int, float]) -> None:
101129
"""
@@ -125,6 +153,7 @@ def delete(self, value: Union[int, float]) -> None:
125153

126154
if deleted:
127155
self.count -= 1
156+
self._sum -= value
128157

129158
def quantile(self, q: float) -> float:
130159
"""
@@ -146,32 +175,52 @@ def quantile(self, q: float) -> float:
146175

147176
rank = q * (self.count - 1)
148177

149-
if self.cont_neg:
150-
neg_count = self.negative_store.total_count
178+
if self.cont_neg and self.negative_store is not None:
179+
neg_count = self.negative_store.count
151180
if rank < neg_count:
152-
# Handle negative values
153-
curr_count = 0
154-
if self.negative_store.min_index is not None:
155-
for idx in range(self.negative_store.max_index, self.negative_store.min_index - 1, -1):
156-
bucket_count = self.negative_store.get_count(idx)
157-
curr_count += bucket_count
158-
if curr_count > rank:
159-
return -self.mapping.compute_value_from_index(idx)
181+
# Handle negative values - use reversed rank
182+
reversed_rank = neg_count - rank - 1
183+
key = self.negative_store.key_at_rank(reversed_rank, lower=False)
184+
return -self.mapping.compute_value_from_index(key)
160185
rank -= neg_count
161186

162187
if rank < self.zero_count:
163-
return 0
188+
return 0.0
164189
rank -= self.zero_count
165190

166-
curr_count = 0
167-
if self.positive_store.min_index is not None:
168-
for idx in range(self.positive_store.min_index, self.positive_store.max_index + 1):
169-
bucket_count = self.positive_store.get_count(idx)
170-
curr_count += bucket_count
171-
if curr_count > rank:
172-
return self.mapping.compute_value_from_index(idx)
173-
174-
return float('inf')
191+
# Use key_at_rank for consistency with storage implementation
192+
key = self.positive_store.key_at_rank(rank)
193+
return self.mapping.compute_value_from_index(key)
194+
195+
# Alias for API compatibility
196+
def get_quantile_value(self, quantile: float) -> float:
197+
"""Alias for quantile()."""
198+
try:
199+
return self.quantile(quantile)
200+
except ValueError:
201+
return None
202+
203+
@property
204+
def avg(self) -> float:
205+
"""Return the exact average of values added to the sketch."""
206+
if self.count == 0:
207+
return 0.0
208+
return self._sum / self.count
209+
210+
@property
211+
def sum(self) -> float:
212+
"""Return the exact sum of values added to the sketch."""
213+
return self._sum
214+
215+
@property
216+
def min(self) -> float:
217+
"""Return the minimum value added to the sketch."""
218+
return self._min
219+
220+
@property
221+
def max(self) -> float:
222+
"""Return the maximum value added to the sketch."""
223+
return self._max
175224

176225
def merge(self, other: 'DDSketch') -> None:
177226
"""
@@ -185,12 +234,20 @@ def merge(self, other: 'DDSketch') -> None:
185234
"""
186235
if self.relative_accuracy != other.relative_accuracy:
187236
raise ValueError("Cannot merge sketches with different relative accuracies")
237+
238+
if other.count == 0:
239+
return
188240

189241
self.positive_store.merge(other.positive_store)
190-
if self.cont_neg and other.cont_neg:
242+
if self.cont_neg and other.cont_neg and other.negative_store is not None:
191243
self.negative_store.merge(other.negative_store)
192-
elif other.cont_neg and sum(other.negative_store.counts.values()) > 0:
244+
elif other.cont_neg and other.negative_store is not None and other.negative_store.count > 0:
193245
raise ValueError("Cannot merge sketch containing negative values when cont_neg is False")
194246

195247
self.zero_count += other.zero_count
196-
self.count += other.count
248+
self.count += other.count
249+
self._sum += other._sum
250+
if other._min < self._min:
251+
self._min = other._min
252+
if other._max > self._max:
253+
self._max = other._max

QuantileFlow/ddsketch/mapping/cubic_interpolation.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
"""
2-
This file contains a Python implementation of the cubic interpolation mapping algorithm
3-
described in Datadog's Java DDSketch implementation (https://github.com/DataDog/sketches-java).
4-
5-
Original work Copyright 2021 Datadog, Inc.
6-
Licensed under Apache License 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
7-
2+
Cubic interpolation mapping scheme for DDSketch.
83
94
This implementation approximates the memory-optimal logarithmic mapping by:
105
1. Extracting the floor value of log2 from binary representation
@@ -34,8 +29,7 @@ def __init__(self, relative_accuracy: float):
3429

3530
# Multiplier m = 7/(10*log(2)) ≈ 1.01
3631
# This gives us the minimum multiplier that maintains relative accuracy guarantee
37-
# Divide by C as per Datadog's implementation
38-
self.m = 1/ (self.C * math.log(2))
32+
self.m = 1 / (self.C * math.log(2))
3933

4034
def _extract_exponent_and_significand(self, value: float) -> tuple[int, float]:
4135
"""
@@ -55,7 +49,7 @@ def _cubic_interpolation(self, s: float) -> float:
5549
Compute the cubic interpolation P(s) = As³ + Bs² + Cs
5650
where s is the normalized significand in [0, 1).
5751
"""
58-
# Use Datadog's order of operations for better numerical stability
52+
# Use Horner's method for better numerical stability
5953
return s * (self.C + s * (self.B + s * self.A))
6054

6155
def compute_bucket_index(self, value: float) -> int:

QuantileFlow/ddsketch/mapping/logarithmic.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,40 @@
33
import math
44
from .base import MappingScheme
55

6+
67
class LogarithmicMapping(MappingScheme):
8+
"""
9+
A memory-optimal KeyMapping that uses logarithmic mapping.
10+
11+
Given a targeted relative accuracy, it requires the least number of keys
12+
to cover a given range of values.
13+
"""
714
__slots__ = ('relative_accuracy', 'gamma', 'multiplier')
815

916
def __init__(self, relative_accuracy: float):
1017
self.relative_accuracy = relative_accuracy
1118
self.gamma = (1 + relative_accuracy) / (1 - relative_accuracy)
1219
self.multiplier = 1 / math.log(self.gamma)
20+
21+
def key(self, value: float) -> int:
22+
"""Alias for compute_bucket_index for API compatibility."""
23+
return self.compute_bucket_index(value)
24+
25+
def value(self, key: int) -> float:
26+
"""Alias for compute_value_from_index for API compatibility."""
27+
return self.compute_value_from_index(key)
1328

1429
def compute_bucket_index(self, value: float) -> int:
15-
# ceil(log_gamma(value) = ceil(log(value) / log(gamma))
30+
"""Compute the bucket index for a given value.
31+
32+
ceil(log_gamma(value)) = ceil(log(value) / log(gamma))
33+
"""
1634
return math.ceil(math.log(value) * self.multiplier)
1735

1836
def compute_value_from_index(self, index: int) -> float:
19-
# Return geometric mean of bucket boundaries
20-
# This ensures the relative error is bounded by relative_accuracy
37+
"""Compute the representative value for a given bucket index.
38+
39+
Returns the geometric mean of bucket boundaries to ensure
40+
the relative error is bounded by relative_accuracy.
41+
"""
2142
return math.pow(self.gamma, index) * (2.0 / (1.0 + self.gamma))

0 commit comments

Comments
 (0)