diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 71e88278..f5a4711b 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -1,4 +1,3 @@ -import pytest """A module for Split.io Factories.""" import logging import threading @@ -643,7 +642,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl ) telemetry_init_producer.record_config(cfg, extra_cfg, total_flag_sets, invalid_flag_sets) - + internal_events_task.start() + if preforked_initialization: synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL) synchronizer._split_synchronizers._segment_sync.shutdown() diff --git a/splitio/events/events_manager.py b/splitio/events/events_manager.py index 54ba06e5..9457e24a 100644 --- a/splitio/events/events_manager.py +++ b/splitio/events/events_manager.py @@ -2,9 +2,9 @@ import threading import logging from collections import namedtuple -import pytest from splitio.events import EventsManagerInterface +from splitio.models.events import SdkEvent _LOGGER = logging.getLogger(__name__) @@ -25,10 +25,17 @@ def __init__(self, events_configurations, events_delivery): self._lock = threading.RLock() def register(self, sdk_event, event_handler): - if self._active_subscriptions.get(sdk_event) != None: + if self._active_subscriptions.get(sdk_event) != None and self._get_event_handler(sdk_event) != None: return - + with self._lock: + # SDK ready already fired + if sdk_event == SdkEvent.SDK_READY and self._event_already_triggered(sdk_event): + self._active_subscriptions[sdk_event] = ActiveSubscriptions(True, event_handler) + _LOGGER.debug("EventsManager: Firing SDK_READY event for new subscription") + self._fire_sdk_event(sdk_event, None) + return + self._active_subscriptions[sdk_event] = ActiveSubscriptions(False, event_handler) def unregister(self, sdk_event): @@ -42,18 +49,27 @@ def notify_internal_event(self, sdk_internal_event, event_metadata): with self._lock: for sorted_event in self._manager_config.evaluation_order: if sorted_event in self._get_sdk_event_if_applicable(sdk_internal_event): - _LOGGER.debug("EventsManager: Firing Sdk event %s", sorted_event) if self._get_event_handler(sorted_event) != None: - notify_event = threading.Thread(target=self._events_delivery.deliver, args=[sorted_event, event_metadata, self._get_event_handler(sorted_event)], - name='SplitSDKEventNotify', daemon=True) - notify_event.start() - self._set_sdk_event_triggered(sorted_event) + self._fire_sdk_event(sorted_event, event_metadata) + + # if client is not subscribed to SDK_READY + if sorted_event == SdkEvent.SDK_READY and self._get_event_handler(sorted_event) == None: + _LOGGER.debug("EventsManager: Registering SDK_READY event as fired") + self._active_subscriptions[SdkEvent.SDK_READY] = ActiveSubscriptions(True, None) + def destroy(self): with self._lock: self._active_subscriptions = {} self._internal_events_status = {} + def _fire_sdk_event(self, sdk_event, event_metadata): + _LOGGER.debug("EventsManager: Firing Sdk event %s", sdk_event) + notify_event = threading.Thread(target=self._events_delivery.deliver, args=[sdk_event, event_metadata, self._get_event_handler(sdk_event)], + name='SplitSDKEventNotify', daemon=True) + notify_event.start() + self._set_sdk_event_triggered(sdk_event) + def _event_already_triggered(self, sdk_event): if self._active_subscriptions.get(sdk_event) != None: return self._active_subscriptions.get(sdk_event).triggered diff --git a/splitio/storage/inmemmory.py b/splitio/storage/inmemmory.py index 75097b14..675478d3 100644 --- a/splitio/storage/inmemmory.py +++ b/splitio/storage/inmemmory.py @@ -547,10 +547,11 @@ def update(self, to_add, to_delete, new_change_number): to_notify = [] [to_notify.append(feature.name) for feature in to_add] to_notify.extend(to_delete) - self._internal_event_queue.put( - SdkInternalEventNotification( - SdkInternalEvent.FLAGS_UPDATED, - EventsMetadata(SdkEventType.FLAG_UPDATE, set(to_notify)))) + if len(to_notify) > 0: + self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.FLAGS_UPDATED, + EventsMetadata(SdkEventType.FLAG_UPDATE, set(to_notify)))) def _put(self, feature_flag): """ diff --git a/splitio/sync/synchronizer.py b/splitio/sync/synchronizer.py index 8685d479..71194d26 100644 --- a/splitio/sync/synchronizer.py +++ b/splitio/sync/synchronizer.py @@ -329,9 +329,6 @@ def start_periodic_data_recording(self): for task in self._periodic_data_recording_tasks: task.start() - if self._split_tasks.internal_events_task: - self._split_tasks.internal_events_task.start() - def stop_periodic_data_recording(self, blocking): """ Stop recorders. @@ -883,8 +880,6 @@ def start_periodic_fetching(self): self._split_tasks.split_task.start() if self._split_tasks.segment_task is not None: self._split_tasks.segment_task.start() - if self._split_tasks.internal_events_task: - self._split_tasks.internal_events_task.start() def stop_periodic_fetching(self): """Stop fetchers for feature flags and segments.""" diff --git a/splitio/version.py b/splitio/version.py index ea7d787e..4f40eda2 100644 --- a/splitio/version.py +++ b/splitio/version.py @@ -1 +1 @@ -__version__ = '10.5.1' \ No newline at end of file +__version__ = '10.6.0' \ No newline at end of file diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index 05e25b51..0b2fe70f 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -28,6 +28,7 @@ from splitio.events.events_manager_config import EventsManagerConfig from splitio.events.events_task import EventsTask from splitio.models import splits, segments, rule_based_segments +from splitio.models.events import SdkEvent from splitio.models.fallback_config import FallbackTreatmentsConfiguration, FallbackTreatmentCalculator from splitio.models.fallback_treatment import FallbackTreatment from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder, StandardRecorderAsync, PipelinedRecorderAsync @@ -2424,6 +2425,181 @@ def clear_cache(self): for key in keys_to_delete: redis_client.delete(key) +class InMemoryEventsNotificationTests(object): + """Inmemory storage-based events notification tests.""" + + ready_flag = False + timeout_flag = False + + def test_sdk_timeout_fire(self): + """Prepare storages with test data.""" + factory2 = get_factory('some_api_key') + client = factory2.client() + client.on(SdkEvent.SDK_READY_TIMED_OUT, self._timeout_callback) + try: + factory2.block_until_ready(1) + except Exception as e: + print(e) + pass + + time.sleep(1) + assert self.timeout_flag + + """Shut down the factory.""" + event = threading.Event() + factory2.destroy(event) + event.wait() + + def test_sdk_ready(self): + """Prepare storages with test data.""" + events_queue = queue.Queue() + split_storage = InMemorySplitStorage(events_queue) + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) + + split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') + with open(split_fn, 'r') as flo: + data = json.loads(flo.read()) + for split in data['ff']['d']: + split_storage.update([splits.from_raw(split)], [], 0) + + for rbs in data['rbs']['d']: + rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + segment_storage.put(segments.from_raw(data)) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + segment_storage.put(segments.from_raw(data)) + + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'rule_based_segments': rb_segment_storage, + 'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer), + 'events': InMemoryEventStorage(5000, telemetry_runtime_producer), + } + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTask(events_manager.notify_internal_event, events_queue) + + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. + try: + factory = SplitFactory('some_api_key', + storages, + True, + recorder, + events_queue, + events_manager, + None, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) + ) # pylint:disable=attribute-defined-outside-init + internal_events_task.start() + except: + pass + + client = factory.client() + client.on(SdkEvent.SDK_READY, self._ready_callback) + factory.block_until_ready(5) + assert self.ready_flag + + """Shut down the factory.""" + event = threading.Event() + factory.destroy(event) + event.wait() + + def test_sdk_ready_fire_later(self): + """Prepare storages with test data.""" + events_queue = queue.Queue() + split_storage = InMemorySplitStorage(events_queue) + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) + + split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') + with open(split_fn, 'r') as flo: + data = json.loads(flo.read()) + for split in data['ff']['d']: + split_storage.update([splits.from_raw(split)], [], 0) + + for rbs in data['rbs']['d']: + rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + segment_storage.put(segments.from_raw(data)) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + segment_storage.put(segments.from_raw(data)) + + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'rule_based_segments': rb_segment_storage, + 'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer), + 'events': InMemoryEventStorage(5000, telemetry_runtime_producer), + } + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTask(events_manager.notify_internal_event, events_queue) + + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. + try: + factory = SplitFactory('some_api_key', + storages, + True, + recorder, + events_queue, + events_manager, + None, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) + ) # pylint:disable=attribute-defined-outside-init + internal_events_task.start() + except: + pass + + client = factory.client() + factory.block_until_ready(5) + + assert client.get_treatment('user1', 'sample_feature', evaluation_options=EvaluationOptions({"prop": "value"})) == 'on' + + self.ready_flag = False + client.on(SdkEvent.SDK_READY, self._ready_callback) + assert self.ready_flag + + """Shut down the factory.""" + event = threading.Event() + factory.destroy(event) + event.wait() + + def _ready_callback(self, metadata): + self.ready_flag = True + + def _timeout_callback(self, metadata): + self.timeout_flag = True + class InMemoryIntegrationAsyncTests(object): """Inmemory storage-based integration tests.""" @@ -4984,4 +5160,4 @@ async def _manager_methods_async(factory, skip_rbs=False): return assert len(await manager.split_names()) == 9 - assert len(await manager.splits()) == 9 + assert len(await manager.splits()) == 9 \ No newline at end of file diff --git a/tests/integration/test_streaming_e2e.py b/tests/integration/test_streaming_e2e.py index 764475de..a673c65c 100644 --- a/tests/integration/test_streaming_e2e.py +++ b/tests/integration/test_streaming_e2e.py @@ -10,6 +10,8 @@ from queue import Queue from splitio.optional.loaders import asyncio from splitio.client.factory import get_factory, get_factory_async +from splitio.models.events import SdkEvent +from splitio.events.events_metadata import SdkEventType from tests.helpers.mockserver import SSEMockServer, SplitMockServer from urllib.parse import parse_qs from splitio.models.telemetry import StreamingEventTypes, SSESyncMode @@ -18,6 +20,9 @@ class StreamingIntegrationTests(object): """Test streaming operation and failover.""" + update_flag = False + metadata = [] + def test_happiness(self): """Test initialization & splits/segment updates.""" auth_server_response = { @@ -70,6 +75,7 @@ def test_happiness(self): } factory = get_factory('some_apikey', **kwargs) + factory.client().on(SdkEvent.SDK_UPDATE, self._update_callcack) factory.block_until_ready(1) assert factory.ready assert factory.client().get_treatment('maldo', 'split1') == 'on' @@ -87,6 +93,13 @@ def test_happiness(self): split_changes[2] = {'ff': {'s': 2, 't': 2, 'd': []}, 'rbs': {'s': -1, 't': -1, 'd': []}} sse_server.publish(make_split_change_event(2)) time.sleep(1) + flag = False + for meta in self.metadata: + if 'split1' in meta.get_names(): + assert meta.get_type() == SdkEventType.FLAG_UPDATE + flag = True + assert flag + assert factory.client().get_treatment('maldo', 'split1') == 'off' split_changes[2] = { @@ -110,14 +123,28 @@ def test_happiness(self): sse_server.publish(make_split_change_event(3)) time.sleep(1) + + self._reset_flags() sse_server.publish(make_segment_change_event('segment1', 1)) time.sleep(1) - + assert self.update_flag + assert self.metadata[len(self.metadata)-1].get_type() == SdkEventType.SEGMENT_UPDATE + flag = False + for meta in self.metadata: + if 'split2' in meta.get_names(): + assert meta.get_type() == SdkEventType.FLAG_UPDATE + flag = True + assert flag + assert factory.client().get_treatment('pindon', 'split2') == 'off' assert factory.client().get_treatment('maldo', 'split2') == 'on' + self._reset_flags() sse_server.publish(make_split_fast_change_event(4)) time.sleep(1) + assert self.update_flag + assert self.metadata[len(self.metadata)-1].get_type() == SdkEventType.FLAG_UPDATE + assert 'split5' in self.metadata[len(self.metadata)-1].get_names() assert factory.client().get_treatment('maldo', 'split5') == 'on' # Validate the SSE request @@ -212,6 +239,13 @@ def test_happiness(self): sse_server.stop() split_backend.stop() + def _update_callcack(self, metadata): + self.update_flag = True + self.metadata.append(metadata) + + def _reset_flags(self): + self.update_flag = False + def test_occupancy_flicker(self): """Test that changes in occupancy switch between polling & streaming properly.""" auth_server_response = {