Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ from luno_python.client import Client
c = Client(api_key_id='key_id', api_key_secret='key_secret')
try:
res = c.get_ticker(pair='XBTZAR')
print res
print(res)
except Exception as e:
print e
print(e)
```

### License
Expand Down
18 changes: 18 additions & 0 deletions examples/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import asyncio

from luno_python.stream_client import stream_market
from luno_python.api_types import format_state

def handle_update(pair, state, update):
print(format_state(pair, state))
if update is not None:
print(update)

asyncio.get_event_loop().run_until_complete(
stream_market(
pair="XBTZAR",
api_key_id="", # API Key goes here
api_key_secret="", # and API Secret goes here
update_callback=handle_update,
)
)
26 changes: 26 additions & 0 deletions luno_python/api_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from collections import namedtuple
from decimal import Decimal
from typing import List

DEC_0 = Decimal("0")

Order = namedtuple("Order", "order_id price volume")

MarketState = namedtuple("MarketState", "sequence asks bids status")

Pair = namedtuple("Pair", "base counter")

def format_orderbook(pair: Pair, asks: List[Order], bids: List[Order]):
if not bids or not asks:
return "Empty Orderbook"

bid_sum = sum((o.price * o.volume for o in bids), DEC_0)
ask_sum = sum((o.volume for o in asks), DEC_0)

mid = (asks[0].price + bids[0].price) / 2

return f"{bid_sum} {pair.counter} - {mid} - {ask_sum} {pair.base}"

def format_state(pair: Pair, state: MarketState):
orderbook = format_orderbook(pair, state.asks, state.bids)
return f"[{state.sequence}] {orderbook}"
4 changes: 2 additions & 2 deletions luno_python/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ class Client(BaseClient):
c = Client(api_key_id='key_id', api_key_secret='key_secret')
try:
res = c.get_ticker(pair='XBTZAR')
print res
print(res)
except Exception as e:
print e
print(e)
"""

def cancel_withdrawal(self, id):
Expand Down
201 changes: 201 additions & 0 deletions luno_python/stream_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""
The stream client can be used to receive live updates to the orderbook.
It also maintains a representation of the Luno orderbook correctly updated for each event.

For example usage see examples/stream.py
"""

import asyncio
from decimal import Decimal
import json
from typing import Callable, Dict, List
import websockets

from .api_types import DEC_0, Order, MarketState, Pair

DEFAULT_URL = "wss://ws.luno.com"

StateUpdate = Callable[[Pair, MarketState, dict], None]

class OutOfOrderMessageException(Exception):
pass


def _flatten_orders(orders, reverse):
return sorted(orders.values(), key=lambda o: o.price, reverse=reverse)


def _decrement_trade(orders: Dict[str, Order], order_id: str, volume: Decimal):
order = orders.pop(order_id, None)
if order is None:
return

new_order = order._replace(volume=order.volume - volume)
if new_order.volume > DEC_0:
orders[order_id] = new_order


class _MarketStreamState:
def __init__(self, first: dict):
if first is None:
raise Exception("Unable to use empty message to initialise market state")

Check warning on line 41 in luno_python/stream_client.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this generic exception class with a more specific one.

See more on https://sonarcloud.io/project/issues?id=luno_luno-python&issues=AZ0bGJLjBBDVKv5IdKf7&open=AZ0bGJLjBBDVKv5IdKf7&pullRequest=76

def conv_message(msg):
return Order(
msg['id'],
Decimal(msg['price']),
Decimal(msg['volume']),
)

bids = [conv_message(m) for m in first['bids']]
asks = [conv_message(m) for m in first['asks']]
self._bids = {b.order_id: b for b in bids}
self._asks = {a.order_id: a for a in asks}
self._sequence = first['sequence']
self._trades = []
self._status = first['status']

def get_asks(self):
return _flatten_orders(self._asks, False)

def get_bids(self):
return _flatten_orders(self._bids, True)

def get_status(self):
return self._status

def get_sequence(self):
return self._sequence

def get_snapshot(self):
return MarketState(
sequence=self.get_sequence(),
asks=self.get_asks(),
bids=self.get_bids(),
status=self.get_status(),
)

def process_update(self, update: dict):
if update is None:
return

seq = update['sequence']
if int(seq) != int(self._sequence)+1:
raise OutOfOrderMessageException()

trades = update.get('trade_updates')
if trades:
self._process_trades(trades)

create = update.get('create_update')
if create:
self._process_create(create)

delete_upd = update.get('delete_update')
if delete_upd:
self._process_delete(delete_upd)

status_upd = update.get('status_update')
if status_upd:
self._process_status(status_upd)

self._sequence = seq

def _process_trades(self, trade_updates: List[dict]):
for t in trade_updates:
maker_id = t['maker_order_id']
volume = Decimal(t['base'])

_decrement_trade(self._asks, maker_id, volume)
_decrement_trade(self._bids, maker_id, volume)

def _process_create(self, create_update: dict):
o = Order(
create_update['order_id'],
Decimal(create_update['price']),
Decimal(create_update['volume']),
)
if create_update['type'] == "ASK":
self._asks[o.order_id] = o
elif create_update['type'] == "BID":
self._bids[o.order_id] = o

def _process_delete(self, delete_update: dict):
order_id = delete_update['order_id']
self._asks.pop(order_id, None)
self._bids.pop(order_id, None)

def _process_status(self, status_update: dict):
self._status = status_update['status']


async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate):
state = None
is_first = True

async for message in ws:
try:
body = json.loads(message)
except ValueError:
raise Exception(message)

Check warning on line 140 in luno_python/stream_client.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this generic exception class with a more specific one.

See more on https://sonarcloud.io/project/issues?id=luno_luno-python&issues=AZ0bGJLjBBDVKv5IdKf8&open=AZ0bGJLjBBDVKv5IdKf8&pullRequest=76
Comment on lines +137 to +140
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Chain the exception and use a specific exception type.

When re-raising inside an except clause, chain the original exception to preserve the traceback. Additionally, as flagged by static analysis, consider using a more specific exception type.

Proposed fix
+class StreamParseException(Exception):
+    pass
+
 async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate):
     state = None
     is_first = True

     async for message in ws:
         try:
             body = json.loads(message)
         except ValueError:
-            raise Exception(message)
+            raise StreamParseException(f"Failed to parse message: {message}") from None
🧰 Tools
🪛 GitHub Check: SonarCloud Code Analysis

[warning] 140-140: Replace this generic exception class with a more specific one.

See more on https://sonarcloud.io/project/issues?id=luno_luno-python&issues=AZ0bGJLjBBDVKv5IdKf8&open=AZ0bGJLjBBDVKv5IdKf8&pullRequest=76

🪛 Ruff (0.15.6)

[warning] 140-140: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 137 - 140, The current try/except
around json.loads(message) should preserve the original traceback and raise a
more specific error: change the except ValueError handler to catch
json.JSONDecodeError (or "except ValueError as e" if JSONDecodeError isn't
imported) and re-raise a specific exception while chaining the original (e.g.
raise ValueError(f"Failed to parse JSON message: {message}") from e or raise a
custom ParseError from e); update the handler that surrounds the json.loads call
(the body = json.loads(message) block) to use "as e" and include "from e" when
re-raising so the original exception is preserved.


if body == "": # Empty update, used as keepalive
body = None

if is_first:
is_first = False
state = _MarketStreamState(body)
update_f(pair, state.get_snapshot(), None)
continue

#could raise OutOfOrderMessageException
state.process_update(body)

update_f(pair, state.get_snapshot(), body)


async def _write_keep_alive(ws):
while True:
await ws.send('""')
await asyncio.sleep(60)


async def stream_market(
pair: str,
api_key_id: str,
api_key_secret: str,
update_callback: StateUpdate,
base_url: str = DEFAULT_URL,
):
"""Opens a stream to /api/1/stream/...

Stream orderbook information and maintain an orderbook state.

:param pair: str Amount to buy or sell in the pair base currency.
:param api_key_id: str
:param api_key_secret: str
:param update_callback: an StateUpdate function that will be called with new updates.
"""
if len(pair) != 6:
raise Exception("Invalid pair")

Check warning on line 180 in luno_python/stream_client.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this generic exception class with a more specific one.

See more on https://sonarcloud.io/project/issues?id=luno_luno-python&issues=AZ0bGJLjBBDVKv5IdKf9&open=AZ0bGJLjBBDVKv5IdKf9&pullRequest=76

p = Pair(pair[:3].upper(), pair[3:].upper())
url = '/'.join([base_url, 'api/1/stream', p.base + p.counter])

async with websockets.connect(
url,
origin='http://localhost/',
ping_interval=None,
max_size=2**21,
) as websocket:

auth = json.dumps({
'api_key_id': api_key_id,
'api_key_secret': api_key_secret,
})
await websocket.send(auth)

await asyncio.gather(
_read_from_websocket(websocket, p, update_callback),
_write_keep_alive(websocket),
)
Comment on lines +198 to +201
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle task cancellation when one coroutine fails.

If _read_from_websocket raises an exception (e.g., OutOfOrderMessageException), the _write_keep_alive task will continue running indefinitely. Consider using asyncio.TaskGroup (Python 3.11+) or wrapping with explicit cancellation logic.

Proposed fix for Python 3.11+
-        await asyncio.gather(
-            _read_from_websocket(websocket, p, update_callback),
-            _write_keep_alive(websocket),
-        )
+        async with asyncio.TaskGroup() as tg:
+            tg.create_task(_read_from_websocket(websocket, p, update_callback))
+            tg.create_task(_write_keep_alive(websocket))
Alternative fix for older Python versions
+        reader_task = asyncio.create_task(_read_from_websocket(websocket, p, update_callback))
+        keepalive_task = asyncio.create_task(_write_keep_alive(websocket))
+        try:
+            await reader_task
+        finally:
+            keepalive_task.cancel()
+            try:
+                await keepalive_task
+            except asyncio.CancelledError:
+                pass
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await asyncio.gather(
_read_from_websocket(websocket, p, update_callback),
_write_keep_alive(websocket),
)
async with asyncio.TaskGroup() as tg:
tg.create_task(_read_from_websocket(websocket, p, update_callback))
tg.create_task(_write_keep_alive(websocket))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 198 - 201, The current
asyncio.gather call launching _read_from_websocket(websocket, p,
update_callback) and _write_keep_alive(websocket) can leave the keep-alive task
running if the reader raises; change this to create managed tasks and cancel the
other when one fails: either use asyncio.TaskGroup (Python 3.11+) to run both
_read_from_websocket and _write_keep_alive so the group cancels remaining tasks
on exception, or for older Python create two tasks via asyncio.create_task,
await asyncio.wait(..., return_when=FIRST_EXCEPTION), detect the first finished
task raising, cancel the pending task(s) (e.g., the keep-alive), and await their
cancellation; apply these changes around the code that currently calls
asyncio.gather so _write_keep_alive is reliably stopped when
_read_from_websocket fails.