Skip to content

Commit 00227ce

Browse files
authored
Merge pull request #613 from splitio/FME-12223-sdk-events-segments
Fme 12223 sdk events segments
2 parents 0989330 + b00410d commit 00227ce

File tree

12 files changed

+152
-81
lines changed

12 files changed

+152
-81
lines changed

splitio/client/factory.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -551,8 +551,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
551551
events_queue = queue.Queue()
552552
storages = {
553553
'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
554-
'segments': InMemorySegmentStorage(),
555-
'rule_based_segments': InMemoryRuleBasedSegmentStorage(),
554+
'segments': InMemorySegmentStorage(events_queue),
555+
'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue),
556556
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize'], telemetry_runtime_producer),
557557
'events': InMemoryEventStorage(cfg['eventsQueueSize'], telemetry_runtime_producer),
558558
}
@@ -1101,8 +1101,8 @@ def _build_localhost_factory(cfg):
11011101
events_queue = queue.Queue()
11021102
storages = {
11031103
'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
1104-
'segments': InMemorySegmentStorage(), # not used, just to avoid possible future errors.
1105-
'rule_based_segments': InMemoryRuleBasedSegmentStorage(),
1104+
'segments': InMemorySegmentStorage(events_queue), # not used, just to avoid possible future errors.
1105+
'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue),
11061106
'impressions': LocalhostImpressionsStorage(),
11071107
'events': LocalhostEventsStorage(),
11081108
}

splitio/storage/inmemmory.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,12 @@ def remove_flag_set(self, flag_sets, feature_flag_name, should_filter):
113113
class InMemoryRuleBasedSegmentStorage(RuleBasedSegmentsStorage):
114114
"""InMemory implementation of a feature flag storage base."""
115115

116-
def __init__(self):
116+
def __init__(self, internal_event_queue):
117117
"""Constructor."""
118118
self._lock = threading.RLock()
119119
self._rule_based_segments = {}
120120
self._change_number = -1
121+
self._internal_event_queue = internal_event_queue
121122

122123
def clear(self):
123124
"""
@@ -153,6 +154,10 @@ def update(self, to_add, to_delete, new_change_number):
153154
[self._put(add_segment) for add_segment in to_add]
154155
[self._remove(delete_segment) for delete_segment in to_delete]
155156
self._set_change_number(new_change_number)
157+
self._internal_event_queue.put(
158+
SdkInternalEventNotification(
159+
SdkInternalEvent.RB_SEGMENTS_UPDATED,
160+
EventsMetadata(SdkEventType.SEGMENT_UPDATE, {})))
156161

157162
def _put(self, rule_based_segment):
158163
"""
@@ -934,11 +939,12 @@ async def is_flag_set_exist(self, flag_set):
934939
class InMemorySegmentStorage(SegmentStorage):
935940
"""In-memory implementation of a segment storage."""
936941

937-
def __init__(self):
942+
def __init__(self, internal_event_queue):
938943
"""Constructor."""
939944
self._segments = {}
940945
self._change_numbers = {}
941946
self._lock = threading.RLock()
947+
self._internal_event_queue = internal_event_queue
942948

943949
def get(self, segment_name):
944950
"""
@@ -968,9 +974,14 @@ def put(self, segment):
968974
with self._lock:
969975
self._segments[segment.name] = segment
970976

977+
self._internal_event_queue.put(
978+
SdkInternalEventNotification(
979+
SdkInternalEvent.SEGMENTS_UPDATED,
980+
EventsMetadata(SdkEventType.SEGMENT_UPDATE, {})))
981+
971982
def update(self, segment_name, to_add, to_remove, change_number=None):
972983
"""
973-
Update a feature flag. Create it if it doesn't exist.
984+
Update a segment. Create it if it doesn't exist.
974985
975986
:param segment_name: Name of the segment to update.
976987
:type segment_name: str
@@ -988,6 +999,11 @@ def update(self, segment_name, to_add, to_remove, change_number=None):
988999
if change_number is not None:
9891000
self._segments[segment_name].change_number = change_number
9901001

1002+
self._internal_event_queue.put(
1003+
SdkInternalEventNotification(
1004+
SdkInternalEvent.SEGMENTS_UPDATED,
1005+
EventsMetadata(SdkEventType.SEGMENT_UPDATE, {})))
1006+
9911007
def get_change_number(self, segment_name):
9921008
"""
9931009
Retrieve latest change number for a segment.
@@ -1100,7 +1116,7 @@ async def put(self, segment):
11001116

11011117
async def update(self, segment_name, to_add, to_remove, change_number=None):
11021118
"""
1103-
Update a feature flag. Create it if it doesn't exist.
1119+
Update a segment. Create it if it doesn't exist.
11041120
11051121
:param segment_name: Name of the segment to update.
11061122
:type segment_name: str

tests/client/test_client.py

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ def test_get_treatment(self, mocker):
3939
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
4040
events_queue = queue.Queue()
4141
split_storage = InMemorySplitStorage(events_queue)
42-
segment_storage = InMemorySegmentStorage()
43-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
42+
segment_storage = InMemorySegmentStorage(events_queue)
43+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
4444
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
4545
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
4646
event_storage = mocker.Mock(spec=EventStorage)
@@ -117,8 +117,8 @@ def test_get_treatment_with_config(self, mocker):
117117
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
118118
events_queue = queue.Queue()
119119
split_storage = InMemorySplitStorage(events_queue)
120-
segment_storage = InMemorySegmentStorage()
121-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
120+
segment_storage = InMemorySegmentStorage(events_queue)
121+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
122122
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
123123
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
124124
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
@@ -195,8 +195,8 @@ def test_get_treatments(self, mocker):
195195
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
196196
events_queue = queue.Queue()
197197
split_storage = InMemorySplitStorage(events_queue)
198-
segment_storage = InMemorySegmentStorage()
199-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
198+
segment_storage = InMemorySegmentStorage(events_queue)
199+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
200200
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
201201
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
202202
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
@@ -276,8 +276,8 @@ def test_get_treatments_by_flag_set(self, mocker):
276276
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
277277
events_queue = queue.Queue()
278278
split_storage = InMemorySplitStorage(events_queue)
279-
segment_storage = InMemorySegmentStorage()
280-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
279+
segment_storage = InMemorySegmentStorage(events_queue)
280+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
281281
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
282282
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
283283
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
@@ -356,8 +356,8 @@ def test_get_treatments_by_flag_sets(self, mocker):
356356
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
357357
events_queue = queue.Queue()
358358
split_storage = InMemorySplitStorage(events_queue)
359-
segment_storage = InMemorySegmentStorage()
360-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
359+
segment_storage = InMemorySegmentStorage(events_queue)
360+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
361361
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
362362
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
363363
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
@@ -436,8 +436,8 @@ def test_get_treatments_with_config(self, mocker):
436436
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
437437
events_queue = queue.Queue()
438438
split_storage = InMemorySplitStorage(events_queue)
439-
segment_storage = InMemorySegmentStorage()
440-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
439+
segment_storage = InMemorySegmentStorage(events_queue)
440+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
441441
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
442442
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
443443
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
@@ -520,8 +520,8 @@ def test_get_treatments_with_config_by_flag_set(self, mocker):
520520
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
521521
events_queue = queue.Queue()
522522
split_storage = InMemorySplitStorage(events_queue)
523-
segment_storage = InMemorySegmentStorage()
524-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
523+
segment_storage = InMemorySegmentStorage(events_queue)
524+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
525525
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
526526
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
527527
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
@@ -601,8 +601,8 @@ def test_get_treatments_with_config_by_flag_sets(self, mocker):
601601
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
602602
events_queue = queue.Queue()
603603
split_storage = InMemorySplitStorage(events_queue)
604-
segment_storage = InMemorySegmentStorage()
605-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
604+
segment_storage = InMemorySegmentStorage(events_queue)
605+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
606606
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
607607
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
608608
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
@@ -682,8 +682,8 @@ def test_impression_toggle_optimized(self, mocker):
682682
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
683683
events_queue = queue.Queue()
684684
split_storage = InMemorySplitStorage(events_queue)
685-
segment_storage = InMemorySegmentStorage()
686-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
685+
segment_storage = InMemorySegmentStorage(events_queue)
686+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
687687
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
688688
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
689689
event_storage = mocker.Mock(spec=EventStorage)
@@ -747,8 +747,8 @@ def test_impression_toggle_debug(self, mocker):
747747
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
748748
events_queue = queue.Queue()
749749
split_storage = InMemorySplitStorage(events_queue)
750-
segment_storage = InMemorySegmentStorage()
751-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
750+
segment_storage = InMemorySegmentStorage(events_queue)
751+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
752752
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
753753
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
754754
event_storage = mocker.Mock(spec=EventStorage)
@@ -812,8 +812,8 @@ def test_impression_toggle_none(self, mocker):
812812
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
813813
events_queue = queue.Queue()
814814
split_storage = InMemorySplitStorage(events_queue)
815-
segment_storage = InMemorySegmentStorage()
816-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
815+
segment_storage = InMemorySegmentStorage(events_queue)
816+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
817817
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
818818
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
819819
event_storage = mocker.Mock(spec=EventStorage)
@@ -953,8 +953,8 @@ def test_evaluations_before_running_post_fork(self, mocker):
953953
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
954954
events_queue = queue.Queue()
955955
split_storage = InMemorySplitStorage(events_queue)
956-
segment_storage = InMemorySegmentStorage()
957-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
956+
segment_storage = InMemorySegmentStorage(events_queue)
957+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
958958
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
959959
destroyed_property = mocker.PropertyMock()
960960
destroyed_property.return_value = False
@@ -1035,8 +1035,8 @@ def test_telemetry_not_ready(self, mocker):
10351035
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
10361036
events_queue = queue.Queue()
10371037
split_storage = InMemorySplitStorage(events_queue)
1038-
segment_storage = InMemorySegmentStorage()
1039-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
1038+
segment_storage = InMemorySegmentStorage(events_queue)
1039+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
10401040
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
10411041
recorder = StandardRecorder(impmanager, mocker.Mock(), mocker.Mock(), telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
10421042
factory = SplitFactory('localhost',
@@ -1071,7 +1071,7 @@ def test_telemetry_record_treatment_exception(self, mocker):
10711071
split_storage = InMemorySplitStorage(events_queue)
10721072
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
10731073
segment_storage = mocker.Mock(spec=SegmentStorage)
1074-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
1074+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
10751075
impression_storage = mocker.Mock(spec=ImpressionStorage)
10761076
event_storage = mocker.Mock(spec=EventStorage)
10771077
destroyed_property = mocker.PropertyMock()
@@ -1175,8 +1175,8 @@ def test_telemetry_method_latency(self, mocker):
11751175
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
11761176
events_queue = queue.Queue()
11771177
split_storage = InMemorySplitStorage(events_queue)
1178-
segment_storage = InMemorySegmentStorage()
1179-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
1178+
segment_storage = InMemorySegmentStorage(events_queue)
1179+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
11801180
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
11811181
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
11821182
destroyed_property = mocker.PropertyMock()
@@ -1288,8 +1288,8 @@ def test_impressions_properties(self, mocker):
12881288
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
12891289
events_queue = queue.Queue()
12901290
split_storage = InMemorySplitStorage(events_queue)
1291-
segment_storage = InMemorySegmentStorage()
1292-
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
1291+
segment_storage = InMemorySegmentStorage(events_queue)
1292+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
12931293
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
12941294
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
12951295
event_storage = mocker.Mock(spec=EventStorage)

tests/engine/test_evaluator.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,8 @@ def test_evaluate_treatment_with_rbs_in_condition(self):
264264
e = evaluator.Evaluator(splitters.Splitter())
265265
events_queue = queue.Queue()
266266
splits_storage = InMemorySplitStorage(events_queue)
267-
rbs_storage = InMemoryRuleBasedSegmentStorage()
268-
segment_storage = InMemorySegmentStorage()
267+
rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue)
268+
segment_storage = InMemorySegmentStorage(events_queue)
269269
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)
270270

271271
rbs_segments = os.path.join(os.path.dirname(__file__), 'files', 'rule_base_segments.json')
@@ -291,8 +291,8 @@ def test_using_segment_in_excluded(self):
291291
e = evaluator.Evaluator(splitters.Splitter())
292292
events_queue = queue.Queue()
293293
splits_storage = InMemorySplitStorage(events_queue)
294-
rbs_storage = InMemoryRuleBasedSegmentStorage()
295-
segment_storage = InMemorySegmentStorage()
294+
rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue)
295+
segment_storage = InMemorySegmentStorage(events_queue)
296296
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)
297297

298298
mocked_split = Split('some', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, [])
@@ -316,8 +316,8 @@ def test_using_rbs_in_excluded(self):
316316
e = evaluator.Evaluator(splitters.Splitter())
317317
events_queue = queue.Queue()
318318
splits_storage = InMemorySplitStorage(events_queue)
319-
rbs_storage = InMemoryRuleBasedSegmentStorage()
320-
segment_storage = InMemorySegmentStorage()
319+
rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue)
320+
segment_storage = InMemorySegmentStorage(events_queue)
321321
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)
322322

323323
mocked_split = Split('some', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, [])
@@ -340,8 +340,8 @@ def test_prerequisites(self):
340340
e = evaluator.Evaluator(splitters.Splitter())
341341
events_queue = queue.Queue()
342342
splits_storage = InMemorySplitStorage(events_queue)
343-
rbs_storage = InMemoryRuleBasedSegmentStorage()
344-
segment_storage = InMemorySegmentStorage()
343+
rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue)
344+
segment_storage = InMemorySegmentStorage(events_queue)
345345
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)
346346

347347
rbs = rule_based_segments.from_raw(data["rbs"]["d"][0])
@@ -549,8 +549,8 @@ def test_get_context(self):
549549
split2 = Split('split2', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, [])
550550
events_queue = queue.Queue()
551551
flag_storage = InMemorySplitStorage(events_queue, [])
552-
segment_storage = InMemorySegmentStorage()
553-
rbs_segment_storage = InMemoryRuleBasedSegmentStorage()
552+
segment_storage = InMemorySegmentStorage(events_queue)
553+
rbs_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
554554
flag_storage.update([mocked_split, split2], [], -1)
555555
rbs = copy.deepcopy(rbs_raw)
556556
rbs['conditions'].append(

0 commit comments

Comments
 (0)