Skip to content

Commit 27792de

Browse files
authored
Merge pull request #611 from splitio/FME-12221-sdk-event-task
Fme 12221 sdk event task
2 parents b92badd + 7c91986 commit 27792de

File tree

3 files changed

+179
-0
lines changed

3 files changed

+179
-0
lines changed

splitio/events/events_task.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""sdk internal events task."""
2+
import logging
3+
import threading
4+
import abc
5+
6+
_LOGGER = logging.getLogger(__name__)
7+
8+
class EventsTaskBase(object, metaclass=abc.ABCMeta):
9+
"""task template."""
10+
11+
@abc.abstractmethod
12+
def is_running(self):
13+
"""Return whether the task is running."""
14+
15+
@abc.abstractmethod
16+
def start(self):
17+
"""Start task."""
18+
19+
@abc.abstractmethod
20+
def stop(self):
21+
"""Stop task."""
22+
23+
class EventsTask(EventsTaskBase):
24+
"""sdk internal events processing task."""
25+
26+
_centinel = object()
27+
28+
def __init__(self, notify_internal_events, internal_events_queue):
29+
"""
30+
Class constructor.
31+
32+
:param synchronize_segment: handler to perform segment synchronization on incoming event
33+
:type synchronize_segment: function
34+
35+
:param segment_queue: queue with segment updates notifications
36+
:type segment_queue: queue
37+
"""
38+
self._internal_events_queue = internal_events_queue
39+
self._handler = notify_internal_events
40+
self._running = False
41+
self._worker = None
42+
43+
def is_running(self):
44+
"""Return whether the working is running."""
45+
return self._running
46+
47+
def _run(self):
48+
"""Run worker handler."""
49+
while self.is_running():
50+
event = self._internal_events_queue.get()
51+
if not self.is_running():
52+
break
53+
54+
if event == self._centinel:
55+
continue
56+
57+
_LOGGER.debug('Processing sdk internal event: %s', event.internal_event)
58+
try:
59+
self._handler(event.internal_event, event.metadata)
60+
except Exception:
61+
_LOGGER.error('Exception raised in events manager')
62+
_LOGGER.debug('Exception information: ', exc_info=True)
63+
64+
def start(self):
65+
"""Start worker."""
66+
if self.is_running():
67+
_LOGGER.debug('Worker is already running')
68+
return
69+
self._running = True
70+
71+
_LOGGER.debug('Starting Event Task worker')
72+
self._worker = threading.Thread(target=self._run, name='EventsTaskWorker', daemon=True)
73+
self._worker.start()
74+
75+
def stop(self):
76+
"""Stop worker."""
77+
_LOGGER.debug('Stopping Event Task worker')
78+
if not self.is_running():
79+
_LOGGER.debug('Worker is not running. Ignoring.')
80+
return
81+
self._running = False
82+
self._internal_events_queue.put(self._centinel)

splitio/models/notification.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,29 @@ def notification_type(self):
170170
def split_name(self):
171171
return self._split_name
172172

173+
class SdkInternalEventNotification(object): # pylint: disable=too-many-instance-attributes
174+
"""SdkInternalEventNotification model object."""
175+
176+
def __init__(self, internal_event, metadata):
177+
"""
178+
Class constructor.
179+
180+
:param internal_event: internal event object
181+
:type channel: SdkInternalEvent
182+
:param metadata: metadata associated with event
183+
:type change_number: EventsMetadata
184+
185+
"""
186+
self._internal_event = internal_event
187+
self._metadata = metadata
188+
189+
@property
190+
def internal_event(self):
191+
return self._internal_event
192+
193+
@property
194+
def metadata(self):
195+
return self._metadata
173196

174197
_NOTIFICATION_MAPPERS = {
175198
Type.SPLIT_UPDATE: lambda c, d: SplitChangeNotification(c, Type.SPLIT_UPDATE, d['changeNumber']),

tests/events/test_events_task.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""EventsManager test module."""
2+
import pytest
3+
import queue
4+
import time
5+
6+
from splitio.models.events import SdkInternalEvent
7+
from splitio.models.notification import SdkInternalEventNotification
8+
from splitio.events.events_metadata import EventsMetadata
9+
from splitio.events.events_metadata import SdkEventType
10+
from splitio.events.events_task import EventsTask
11+
12+
13+
class EventsTaskTests(object):
14+
"""Tests for EventsManager."""
15+
16+
internal_event = None
17+
metadata = None
18+
19+
def test_firing_events(self):
20+
events_queue = queue.Queue()
21+
events_task = EventsTask(self._event_callback, events_queue)
22+
23+
events_task.start()
24+
assert events_task.is_running()
25+
26+
metadata = EventsMetadata(SdkEventType.FLAG_UPDATE, { "feature1" })
27+
events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, metadata))
28+
time.sleep(.5)
29+
assert self.internal_event == SdkInternalEvent.SDK_READY
30+
self._verify_metadata(metadata)
31+
32+
self._reset_flags()
33+
events_queue.put(SdkInternalEventNotification(SdkInternalEvent.RB_SEGMENTS_UPDATED, metadata))
34+
time.sleep(.5)
35+
assert self.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED
36+
self._verify_metadata(metadata)
37+
38+
events_task.stop()
39+
time.sleep(.5)
40+
assert not events_task.is_running()
41+
42+
def test_on_error(self):
43+
events_queue = queue.Queue()
44+
45+
def handler_sync(internal_event, metadata):
46+
raise Exception('some')
47+
48+
events_task = EventsTask(handler_sync, events_queue)
49+
events_task.start()
50+
assert events_task.is_running()
51+
52+
events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None))
53+
54+
with pytest.raises(Exception):
55+
events_task._handler()
56+
57+
assert events_task.is_running()
58+
events_task.stop()
59+
time.sleep(1)
60+
assert not events_task.is_running()
61+
62+
def _reset_flags(self):
63+
self.internal_event = None
64+
self.metadata = None
65+
66+
def _event_callback(self, internal_event, metadata):
67+
self.internal_event = internal_event
68+
self.metadata = metadata
69+
70+
def _verify_metadata(self, metadata):
71+
assert metadata.get_type() == self.metadata.get_type()
72+
assert metadata.get_names() == self.metadata.get_names()
73+
74+

0 commit comments

Comments
 (0)