Merge Streaming Client changes from Adam Hicks#76
Merge Streaming Client changes from Adam Hicks#76YinYin-blip wants to merge 9 commits intoluno:mainfrom
Conversation
As recommended by reviewer comment
Streamingapi
📝 WalkthroughWalkthroughThe pull request introduces a new async streaming client for the Luno cryptocurrency exchange API alongside supporting infrastructure. Changes include: updating Python 2-style print statements to Python 3 syntax in existing documentation, creating data container types (Order, MarketState, Pair) with formatting utilities, adding a new websocket-based streaming module ( Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
examples/stream.py (1)
11-18: Useasyncio.run()instead of deprecatedget_event_loop().run_until_complete().
asyncio.get_event_loop()is deprecated since Python 3.10 and emits aDeprecationWarningwhen called from the main thread without a running event loop. Useasyncio.run()for cleaner, forward-compatible code.Proposed fix
-asyncio.get_event_loop().run_until_complete( +asyncio.run( stream_market( pair="XBTZAR", api_key_id="", # API Key goes here api_key_secret="", # and API Secret goes here update_callback=handle_update, ) )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/stream.py` around lines 11 - 18, Replace the deprecated asyncio.get_event_loop().run_until_complete(...) call with asyncio.run(...) by invoking asyncio.run(stream_market(...)) to run the top-level coroutine; specifically update the call that currently wraps stream_market(pair="XBTZAR", api_key_id="", api_key_secret="", update_callback=handle_update) so it uses asyncio.run and otherwise passes the same arguments to stream_market and keeps handle_update as the callback.luno_python/stream_client.py (2)
39-41: Use a specific exception type for initialisation errors.As flagged by static analysis, replace the generic
Exceptionwith a more descriptive custom exception.Proposed fix
+class StreamInitException(Exception): + pass + class _MarketStreamState: def __init__(self, first: dict): if first is None: - raise Exception("Unable to use empty message to initialise market state") + raise StreamInitException("Unable to use empty message to initialise market state")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@luno_python/stream_client.py` around lines 39 - 41, The constructor StreamClient.__init__ currently raises a generic Exception when given a None initial message; define and use a specific exception type (e.g., InvalidInitialMessageError or InitializationError) in the module and replace the raise Exception(...) with raise InvalidInitialMessageError("Unable to use empty message to initialise market state") (or similar message) so callers can catch this specific error; ensure the new exception class inherits from Exception and is declared at module level near other exceptions or top of stream_client.py.
179-180: Use a specific exception type for validation errors.As flagged by static analysis, replace the generic
Exception. Also note that the 6-character assumption may not hold for all currency pairs if Luno introduces currencies with different code lengths.Proposed fix
+class InvalidPairException(Exception): + pass + async def stream_market( pair: str, ... ): if len(pair) != 6: - raise Exception("Invalid pair") + raise InvalidPairException(f"Invalid pair: {pair}. Expected 6-character pair code.")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@luno_python/stream_client.py` around lines 179 - 180, Replace the generic Exception raised for an invalid pair with a specific validation exception and relax the hard-coded 6-char assumption: validate the pair variable using a pattern (e.g. import re and use re.match(r'^[A-Z]{3,4}[A-Z]{3,4}$', pair)) or a custom rule that allows variable-length currency codes, and raise a ValueError or a new InvalidPairError instead of Exception; update the check around pair in stream_client.py (the block that currently does if len(pair) != 6: raise Exception("Invalid pair")) to use the new validation and exception.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@luno_python/stream_client.py`:
- Around line 198-201: The current asyncio.gather call launching
_read_from_websocket(websocket, p, update_callback) and
_write_keep_alive(websocket) can leave the keep-alive task running if the reader
raises; change this to create managed tasks and cancel the other when one fails:
either use asyncio.TaskGroup (Python 3.11+) to run both _read_from_websocket and
_write_keep_alive so the group cancels remaining tasks on exception, or for
older Python create two tasks via asyncio.create_task, await asyncio.wait(...,
return_when=FIRST_EXCEPTION), detect the first finished task raising, cancel the
pending task(s) (e.g., the keep-alive), and await their cancellation; apply
these changes around the code that currently calls asyncio.gather so
_write_keep_alive is reliably stopped when _read_from_websocket fails.
- Around line 137-140: The current try/except around json.loads(message) should
preserve the original traceback and raise a more specific error: change the
except ValueError handler to catch json.JSONDecodeError (or "except ValueError
as e" if JSONDecodeError isn't imported) and re-raise a specific exception while
chaining the original (e.g. raise ValueError(f"Failed to parse JSON message:
{message}") from e or raise a custom ParseError from e); update the handler that
surrounds the json.loads call (the body = json.loads(message) block) to use "as
e" and include "from e" when re-raising so the original exception is preserved.
---
Nitpick comments:
In `@examples/stream.py`:
- Around line 11-18: Replace the deprecated
asyncio.get_event_loop().run_until_complete(...) call with asyncio.run(...) by
invoking asyncio.run(stream_market(...)) to run the top-level coroutine;
specifically update the call that currently wraps stream_market(pair="XBTZAR",
api_key_id="", api_key_secret="", update_callback=handle_update) so it uses
asyncio.run and otherwise passes the same arguments to stream_market and keeps
handle_update as the callback.
In `@luno_python/stream_client.py`:
- Around line 39-41: The constructor StreamClient.__init__ currently raises a
generic Exception when given a None initial message; define and use a specific
exception type (e.g., InvalidInitialMessageError or InitializationError) in the
module and replace the raise Exception(...) with raise
InvalidInitialMessageError("Unable to use empty message to initialise market
state") (or similar message) so callers can catch this specific error; ensure
the new exception class inherits from Exception and is declared at module level
near other exceptions or top of stream_client.py.
- Around line 179-180: Replace the generic Exception raised for an invalid pair
with a specific validation exception and relax the hard-coded 6-char assumption:
validate the pair variable using a pattern (e.g. import re and use
re.match(r'^[A-Z]{3,4}[A-Z]{3,4}$', pair)) or a custom rule that allows
variable-length currency codes, and raise a ValueError or a new InvalidPairError
instead of Exception; update the check around pair in stream_client.py (the
block that currently does if len(pair) != 6: raise Exception("Invalid pair")) to
use the new validation and exception.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a089ae67-fb3d-4396-a002-6fa13a8342e6
📒 Files selected for processing (5)
README.mdexamples/stream.pyluno_python/api_types.pyluno_python/client.pyluno_python/stream_client.py
| try: | ||
| body = json.loads(message) | ||
| except ValueError: | ||
| raise Exception(message) |
There was a problem hiding this comment.
Chain the exception and use a specific exception type.
When re-raising inside an except clause, chain the original exception to preserve the traceback. Additionally, as flagged by static analysis, consider using a more specific exception type.
Proposed fix
+class StreamParseException(Exception):
+ pass
+
async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate):
state = None
is_first = True
async for message in ws:
try:
body = json.loads(message)
except ValueError:
- raise Exception(message)
+ raise StreamParseException(f"Failed to parse message: {message}") from None🧰 Tools
🪛 GitHub Check: SonarCloud Code Analysis
[warning] 140-140: Replace this generic exception class with a more specific one.
🪛 Ruff (0.15.6)
[warning] 140-140: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@luno_python/stream_client.py` around lines 137 - 140, The current try/except
around json.loads(message) should preserve the original traceback and raise a
more specific error: change the except ValueError handler to catch
json.JSONDecodeError (or "except ValueError as e" if JSONDecodeError isn't
imported) and re-raise a specific exception while chaining the original (e.g.
raise ValueError(f"Failed to parse JSON message: {message}") from e or raise a
custom ParseError from e); update the handler that surrounds the json.loads call
(the body = json.loads(message) block) to use "as e" and include "from e" when
re-raising so the original exception is preserved.
| await asyncio.gather( | ||
| _read_from_websocket(websocket, p, update_callback), | ||
| _write_keep_alive(websocket), | ||
| ) |
There was a problem hiding this comment.
Handle task cancellation when one coroutine fails.
If _read_from_websocket raises an exception (e.g., OutOfOrderMessageException), the _write_keep_alive task will continue running indefinitely. Consider using asyncio.TaskGroup (Python 3.11+) or wrapping with explicit cancellation logic.
Proposed fix for Python 3.11+
- await asyncio.gather(
- _read_from_websocket(websocket, p, update_callback),
- _write_keep_alive(websocket),
- )
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(_read_from_websocket(websocket, p, update_callback))
+ tg.create_task(_write_keep_alive(websocket))Alternative fix for older Python versions
+ reader_task = asyncio.create_task(_read_from_websocket(websocket, p, update_callback))
+ keepalive_task = asyncio.create_task(_write_keep_alive(websocket))
+ try:
+ await reader_task
+ finally:
+ keepalive_task.cancel()
+ try:
+ await keepalive_task
+ except asyncio.CancelledError:
+ pass📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await asyncio.gather( | |
| _read_from_websocket(websocket, p, update_callback), | |
| _write_keep_alive(websocket), | |
| ) | |
| async with asyncio.TaskGroup() as tg: | |
| tg.create_task(_read_from_websocket(websocket, p, update_callback)) | |
| tg.create_task(_write_keep_alive(websocket)) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@luno_python/stream_client.py` around lines 198 - 201, The current
asyncio.gather call launching _read_from_websocket(websocket, p,
update_callback) and _write_keep_alive(websocket) can leave the keep-alive task
running if the reader raises; change this to create managed tasks and cancel the
other when one fails: either use asyncio.TaskGroup (Python 3.11+) to run both
_read_from_websocket and _write_keep_alive so the group cancels remaining tasks
on exception, or for older Python create two tasks via asyncio.create_task,
await asyncio.wait(..., return_when=FIRST_EXCEPTION), detect the first finished
task raising, cancel the pending task(s) (e.g., the keep-alive), and await their
cancellation; apply these changes around the code that currently calls
asyncio.gather so _write_keep_alive is reliably stopped when
_read_from_websocket fails.



Issue: #75
This code create an example streaming client that successfully creates and manages the web-socket connection
Summary by CodeRabbit
New Features
Documentation