Skip to content

Merge Streaming Client changes from Adam Hicks#76

Open
YinYin-blip wants to merge 9 commits intoluno:mainfrom
YinYin-blip:master
Open

Merge Streaming Client changes from Adam Hicks#76
YinYin-blip wants to merge 9 commits intoluno:mainfrom
YinYin-blip:master

Conversation

@YinYin-blip
Copy link

@YinYin-blip YinYin-blip commented Mar 23, 2026

Issue: #75

This code create an example streaming client that successfully creates and manages the web-socket connection

Summary by CodeRabbit

  • New Features

    • Added asynchronous streaming capability for real-time order book data and market state updates
    • Included example script demonstrating how to use the streaming client
    • Added utility functions for formatting and displaying market data
  • Documentation

    • Updated code examples throughout to use Python 3 syntax

@coderabbitai
Copy link

coderabbitai bot commented Mar 23, 2026

📝 Walkthrough

Walkthrough

The pull request introduces a new async streaming client for the Luno cryptocurrency exchange API alongside supporting infrastructure. Changes include: updating Python 2-style print statements to Python 3 syntax in existing documentation, creating data container types (Order, MarketState, Pair) with formatting utilities, adding a new websocket-based streaming module (stream_client.py) that manages real-time order book state with sequence validation and keepalive functionality, and providing a runnable example script demonstrating the streaming API usage.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 A stream of trades flows swift and bright,
With async dances through the night,
Keepalives hop to keep alive,
Order books now learn to thrive,
Python three leads us to the sky,
Where real-time markets never lie! ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 9.52% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title is vague and generic, referring only to the author's name without describing the actual changes made to the codebase. Replace with a more descriptive title that highlights the primary change, such as 'Add streaming client for orderbook updates' or similar.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The description is related to the changeset as it mentions adding a streaming client and WebSocket connection, which aligns with the new streaming functionality introduced.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sonarqubecloud
Copy link

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
examples/stream.py (1)

11-18: Use asyncio.run() instead of deprecated get_event_loop().run_until_complete().

asyncio.get_event_loop() is deprecated since Python 3.10 and emits a DeprecationWarning when called from the main thread without a running event loop. Use asyncio.run() for cleaner, forward-compatible code.

Proposed fix
-asyncio.get_event_loop().run_until_complete(
+asyncio.run(
     stream_market(
         pair="XBTZAR",
         api_key_id="", # API Key goes here
         api_key_secret="", # and API Secret goes here
         update_callback=handle_update,
     )
 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/stream.py` around lines 11 - 18, Replace the deprecated
asyncio.get_event_loop().run_until_complete(...) call with asyncio.run(...) by
invoking asyncio.run(stream_market(...)) to run the top-level coroutine;
specifically update the call that currently wraps stream_market(pair="XBTZAR",
api_key_id="", api_key_secret="", update_callback=handle_update) so it uses
asyncio.run and otherwise passes the same arguments to stream_market and keeps
handle_update as the callback.
luno_python/stream_client.py (2)

39-41: Use a specific exception type for initialisation errors.

As flagged by static analysis, replace the generic Exception with a more descriptive custom exception.

Proposed fix
+class StreamInitException(Exception):
+    pass
+
 class _MarketStreamState:
     def __init__(self, first: dict):
         if first is None:
-            raise Exception("Unable to use empty message to initialise market state")
+            raise StreamInitException("Unable to use empty message to initialise market state")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 39 - 41, The constructor
StreamClient.__init__ currently raises a generic Exception when given a None
initial message; define and use a specific exception type (e.g.,
InvalidInitialMessageError or InitializationError) in the module and replace the
raise Exception(...) with raise InvalidInitialMessageError("Unable to use empty
message to initialise market state") (or similar message) so callers can catch
this specific error; ensure the new exception class inherits from Exception and
is declared at module level near other exceptions or top of stream_client.py.

179-180: Use a specific exception type for validation errors.

As flagged by static analysis, replace the generic Exception. Also note that the 6-character assumption may not hold for all currency pairs if Luno introduces currencies with different code lengths.

Proposed fix
+class InvalidPairException(Exception):
+    pass
+
 async def stream_market(
         pair: str,
         ...
 ):
     if len(pair) != 6:
-        raise Exception("Invalid pair")
+        raise InvalidPairException(f"Invalid pair: {pair}. Expected 6-character pair code.")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 179 - 180, Replace the generic
Exception raised for an invalid pair with a specific validation exception and
relax the hard-coded 6-char assumption: validate the pair variable using a
pattern (e.g. import re and use re.match(r'^[A-Z]{3,4}[A-Z]{3,4}$', pair)) or a
custom rule that allows variable-length currency codes, and raise a ValueError
or a new InvalidPairError instead of Exception; update the check around pair in
stream_client.py (the block that currently does if len(pair) != 6: raise
Exception("Invalid pair")) to use the new validation and exception.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@luno_python/stream_client.py`:
- Around line 198-201: The current asyncio.gather call launching
_read_from_websocket(websocket, p, update_callback) and
_write_keep_alive(websocket) can leave the keep-alive task running if the reader
raises; change this to create managed tasks and cancel the other when one fails:
either use asyncio.TaskGroup (Python 3.11+) to run both _read_from_websocket and
_write_keep_alive so the group cancels remaining tasks on exception, or for
older Python create two tasks via asyncio.create_task, await asyncio.wait(...,
return_when=FIRST_EXCEPTION), detect the first finished task raising, cancel the
pending task(s) (e.g., the keep-alive), and await their cancellation; apply
these changes around the code that currently calls asyncio.gather so
_write_keep_alive is reliably stopped when _read_from_websocket fails.
- Around line 137-140: The current try/except around json.loads(message) should
preserve the original traceback and raise a more specific error: change the
except ValueError handler to catch json.JSONDecodeError (or "except ValueError
as e" if JSONDecodeError isn't imported) and re-raise a specific exception while
chaining the original (e.g. raise ValueError(f"Failed to parse JSON message:
{message}") from e or raise a custom ParseError from e); update the handler that
surrounds the json.loads call (the body = json.loads(message) block) to use "as
e" and include "from e" when re-raising so the original exception is preserved.

---

Nitpick comments:
In `@examples/stream.py`:
- Around line 11-18: Replace the deprecated
asyncio.get_event_loop().run_until_complete(...) call with asyncio.run(...) by
invoking asyncio.run(stream_market(...)) to run the top-level coroutine;
specifically update the call that currently wraps stream_market(pair="XBTZAR",
api_key_id="", api_key_secret="", update_callback=handle_update) so it uses
asyncio.run and otherwise passes the same arguments to stream_market and keeps
handle_update as the callback.

In `@luno_python/stream_client.py`:
- Around line 39-41: The constructor StreamClient.__init__ currently raises a
generic Exception when given a None initial message; define and use a specific
exception type (e.g., InvalidInitialMessageError or InitializationError) in the
module and replace the raise Exception(...) with raise
InvalidInitialMessageError("Unable to use empty message to initialise market
state") (or similar message) so callers can catch this specific error; ensure
the new exception class inherits from Exception and is declared at module level
near other exceptions or top of stream_client.py.
- Around line 179-180: Replace the generic Exception raised for an invalid pair
with a specific validation exception and relax the hard-coded 6-char assumption:
validate the pair variable using a pattern (e.g. import re and use
re.match(r'^[A-Z]{3,4}[A-Z]{3,4}$', pair)) or a custom rule that allows
variable-length currency codes, and raise a ValueError or a new InvalidPairError
instead of Exception; update the check around pair in stream_client.py (the
block that currently does if len(pair) != 6: raise Exception("Invalid pair")) to
use the new validation and exception.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a089ae67-fb3d-4396-a002-6fa13a8342e6

📥 Commits

Reviewing files that changed from the base of the PR and between 838df1f and c2393a5.

📒 Files selected for processing (5)
  • README.md
  • examples/stream.py
  • luno_python/api_types.py
  • luno_python/client.py
  • luno_python/stream_client.py

Comment on lines +137 to +140
try:
body = json.loads(message)
except ValueError:
raise Exception(message)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Chain the exception and use a specific exception type.

When re-raising inside an except clause, chain the original exception to preserve the traceback. Additionally, as flagged by static analysis, consider using a more specific exception type.

Proposed fix
+class StreamParseException(Exception):
+    pass
+
 async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate):
     state = None
     is_first = True

     async for message in ws:
         try:
             body = json.loads(message)
         except ValueError:
-            raise Exception(message)
+            raise StreamParseException(f"Failed to parse message: {message}") from None
🧰 Tools
🪛 GitHub Check: SonarCloud Code Analysis

[warning] 140-140: Replace this generic exception class with a more specific one.

See more on https://sonarcloud.io/project/issues?id=luno_luno-python&issues=AZ0bGJLjBBDVKv5IdKf8&open=AZ0bGJLjBBDVKv5IdKf8&pullRequest=76

🪛 Ruff (0.15.6)

[warning] 140-140: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 137 - 140, The current try/except
around json.loads(message) should preserve the original traceback and raise a
more specific error: change the except ValueError handler to catch
json.JSONDecodeError (or "except ValueError as e" if JSONDecodeError isn't
imported) and re-raise a specific exception while chaining the original (e.g.
raise ValueError(f"Failed to parse JSON message: {message}") from e or raise a
custom ParseError from e); update the handler that surrounds the json.loads call
(the body = json.loads(message) block) to use "as e" and include "from e" when
re-raising so the original exception is preserved.

Comment on lines +198 to +201
await asyncio.gather(
_read_from_websocket(websocket, p, update_callback),
_write_keep_alive(websocket),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle task cancellation when one coroutine fails.

If _read_from_websocket raises an exception (e.g., OutOfOrderMessageException), the _write_keep_alive task will continue running indefinitely. Consider using asyncio.TaskGroup (Python 3.11+) or wrapping with explicit cancellation logic.

Proposed fix for Python 3.11+
-        await asyncio.gather(
-            _read_from_websocket(websocket, p, update_callback),
-            _write_keep_alive(websocket),
-        )
+        async with asyncio.TaskGroup() as tg:
+            tg.create_task(_read_from_websocket(websocket, p, update_callback))
+            tg.create_task(_write_keep_alive(websocket))
Alternative fix for older Python versions
+        reader_task = asyncio.create_task(_read_from_websocket(websocket, p, update_callback))
+        keepalive_task = asyncio.create_task(_write_keep_alive(websocket))
+        try:
+            await reader_task
+        finally:
+            keepalive_task.cancel()
+            try:
+                await keepalive_task
+            except asyncio.CancelledError:
+                pass
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await asyncio.gather(
_read_from_websocket(websocket, p, update_callback),
_write_keep_alive(websocket),
)
async with asyncio.TaskGroup() as tg:
tg.create_task(_read_from_websocket(websocket, p, update_callback))
tg.create_task(_write_keep_alive(websocket))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 198 - 201, The current
asyncio.gather call launching _read_from_websocket(websocket, p,
update_callback) and _write_keep_alive(websocket) can leave the keep-alive task
running if the reader raises; change this to create managed tasks and cancel the
other when one fails: either use asyncio.TaskGroup (Python 3.11+) to run both
_read_from_websocket and _write_keep_alive so the group cancels remaining tasks
on exception, or for older Python create two tasks via asyncio.create_task,
await asyncio.wait(..., return_when=FIRST_EXCEPTION), detect the first finished
task raising, cancel the pending task(s) (e.g., the keep-alive), and await their
cancellation; apply these changes around the code that currently calls
asyncio.gather so _write_keep_alive is reliably stopped when
_read_from_websocket fails.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants