-
Notifications
You must be signed in to change notification settings - Fork 41
Merge Streaming Client changes from Adam Hicks #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c69c8f7
1481a99
346d5f4
b2b051d
fccc48c
27efc6b
f30b9bd
af6d000
c2393a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| import asyncio | ||
|
|
||
| from luno_python.stream_client import stream_market | ||
| from luno_python.api_types import format_state | ||
|
|
||
| def handle_update(pair, state, update): | ||
| print(format_state(pair, state)) | ||
| if update is not None: | ||
| print(update) | ||
|
|
||
| asyncio.get_event_loop().run_until_complete( | ||
| stream_market( | ||
| pair="XBTZAR", | ||
| api_key_id="", # API Key goes here | ||
| api_key_secret="", # and API Secret goes here | ||
| update_callback=handle_update, | ||
| ) | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| from collections import namedtuple | ||
| from decimal import Decimal | ||
| from typing import List | ||
|
|
||
| DEC_0 = Decimal("0") | ||
|
|
||
| Order = namedtuple("Order", "order_id price volume") | ||
|
|
||
| MarketState = namedtuple("MarketState", "sequence asks bids status") | ||
|
|
||
| Pair = namedtuple("Pair", "base counter") | ||
|
|
||
| def format_orderbook(pair: Pair, asks: List[Order], bids: List[Order]): | ||
| if not bids or not asks: | ||
| return "Empty Orderbook" | ||
|
|
||
| bid_sum = sum((o.price * o.volume for o in bids), DEC_0) | ||
| ask_sum = sum((o.volume for o in asks), DEC_0) | ||
|
|
||
| mid = (asks[0].price + bids[0].price) / 2 | ||
|
|
||
| return f"{bid_sum} {pair.counter} - {mid} - {ask_sum} {pair.base}" | ||
|
|
||
| def format_state(pair: Pair, state: MarketState): | ||
| orderbook = format_orderbook(pair, state.asks, state.bids) | ||
| return f"[{state.sequence}] {orderbook}" |
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,201 @@ | ||||||||||||||||
| """ | ||||||||||||||||
| The stream client can be used to receive live updates to the orderbook. | ||||||||||||||||
| It also maintains a representation of the Luno orderbook correctly updated for each event. | ||||||||||||||||
|
|
||||||||||||||||
| For example usage see examples/stream.py | ||||||||||||||||
| """ | ||||||||||||||||
|
|
||||||||||||||||
| import asyncio | ||||||||||||||||
| from decimal import Decimal | ||||||||||||||||
| import json | ||||||||||||||||
| from typing import Callable, Dict, List | ||||||||||||||||
| import websockets | ||||||||||||||||
|
|
||||||||||||||||
| from .api_types import DEC_0, Order, MarketState, Pair | ||||||||||||||||
|
|
||||||||||||||||
| DEFAULT_URL = "wss://ws.luno.com" | ||||||||||||||||
|
|
||||||||||||||||
| StateUpdate = Callable[[Pair, MarketState, dict], None] | ||||||||||||||||
|
|
||||||||||||||||
| class OutOfOrderMessageException(Exception): | ||||||||||||||||
| pass | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| def _flatten_orders(orders, reverse): | ||||||||||||||||
| return sorted(orders.values(), key=lambda o: o.price, reverse=reverse) | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| def _decrement_trade(orders: Dict[str, Order], order_id: str, volume: Decimal): | ||||||||||||||||
| order = orders.pop(order_id, None) | ||||||||||||||||
| if order is None: | ||||||||||||||||
| return | ||||||||||||||||
|
|
||||||||||||||||
| new_order = order._replace(volume=order.volume - volume) | ||||||||||||||||
| if new_order.volume > DEC_0: | ||||||||||||||||
| orders[order_id] = new_order | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| class _MarketStreamState: | ||||||||||||||||
| def __init__(self, first: dict): | ||||||||||||||||
| if first is None: | ||||||||||||||||
| raise Exception("Unable to use empty message to initialise market state") | ||||||||||||||||
|
Check warning on line 41 in luno_python/stream_client.py
|
||||||||||||||||
|
|
||||||||||||||||
| def conv_message(msg): | ||||||||||||||||
| return Order( | ||||||||||||||||
| msg['id'], | ||||||||||||||||
| Decimal(msg['price']), | ||||||||||||||||
| Decimal(msg['volume']), | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| bids = [conv_message(m) for m in first['bids']] | ||||||||||||||||
| asks = [conv_message(m) for m in first['asks']] | ||||||||||||||||
| self._bids = {b.order_id: b for b in bids} | ||||||||||||||||
| self._asks = {a.order_id: a for a in asks} | ||||||||||||||||
| self._sequence = first['sequence'] | ||||||||||||||||
| self._trades = [] | ||||||||||||||||
| self._status = first['status'] | ||||||||||||||||
|
|
||||||||||||||||
| def get_asks(self): | ||||||||||||||||
| return _flatten_orders(self._asks, False) | ||||||||||||||||
|
|
||||||||||||||||
| def get_bids(self): | ||||||||||||||||
| return _flatten_orders(self._bids, True) | ||||||||||||||||
|
|
||||||||||||||||
| def get_status(self): | ||||||||||||||||
| return self._status | ||||||||||||||||
|
|
||||||||||||||||
| def get_sequence(self): | ||||||||||||||||
| return self._sequence | ||||||||||||||||
|
|
||||||||||||||||
| def get_snapshot(self): | ||||||||||||||||
| return MarketState( | ||||||||||||||||
| sequence=self.get_sequence(), | ||||||||||||||||
| asks=self.get_asks(), | ||||||||||||||||
| bids=self.get_bids(), | ||||||||||||||||
| status=self.get_status(), | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| def process_update(self, update: dict): | ||||||||||||||||
| if update is None: | ||||||||||||||||
| return | ||||||||||||||||
|
|
||||||||||||||||
| seq = update['sequence'] | ||||||||||||||||
| if int(seq) != int(self._sequence)+1: | ||||||||||||||||
| raise OutOfOrderMessageException() | ||||||||||||||||
|
|
||||||||||||||||
| trades = update.get('trade_updates') | ||||||||||||||||
| if trades: | ||||||||||||||||
| self._process_trades(trades) | ||||||||||||||||
|
|
||||||||||||||||
| create = update.get('create_update') | ||||||||||||||||
| if create: | ||||||||||||||||
| self._process_create(create) | ||||||||||||||||
|
|
||||||||||||||||
| delete_upd = update.get('delete_update') | ||||||||||||||||
| if delete_upd: | ||||||||||||||||
| self._process_delete(delete_upd) | ||||||||||||||||
|
|
||||||||||||||||
| status_upd = update.get('status_update') | ||||||||||||||||
| if status_upd: | ||||||||||||||||
| self._process_status(status_upd) | ||||||||||||||||
|
|
||||||||||||||||
| self._sequence = seq | ||||||||||||||||
|
|
||||||||||||||||
| def _process_trades(self, trade_updates: List[dict]): | ||||||||||||||||
| for t in trade_updates: | ||||||||||||||||
| maker_id = t['maker_order_id'] | ||||||||||||||||
| volume = Decimal(t['base']) | ||||||||||||||||
|
|
||||||||||||||||
| _decrement_trade(self._asks, maker_id, volume) | ||||||||||||||||
| _decrement_trade(self._bids, maker_id, volume) | ||||||||||||||||
|
|
||||||||||||||||
| def _process_create(self, create_update: dict): | ||||||||||||||||
| o = Order( | ||||||||||||||||
| create_update['order_id'], | ||||||||||||||||
| Decimal(create_update['price']), | ||||||||||||||||
| Decimal(create_update['volume']), | ||||||||||||||||
| ) | ||||||||||||||||
| if create_update['type'] == "ASK": | ||||||||||||||||
| self._asks[o.order_id] = o | ||||||||||||||||
| elif create_update['type'] == "BID": | ||||||||||||||||
| self._bids[o.order_id] = o | ||||||||||||||||
|
|
||||||||||||||||
| def _process_delete(self, delete_update: dict): | ||||||||||||||||
| order_id = delete_update['order_id'] | ||||||||||||||||
| self._asks.pop(order_id, None) | ||||||||||||||||
| self._bids.pop(order_id, None) | ||||||||||||||||
|
|
||||||||||||||||
| def _process_status(self, status_update: dict): | ||||||||||||||||
| self._status = status_update['status'] | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate): | ||||||||||||||||
| state = None | ||||||||||||||||
| is_first = True | ||||||||||||||||
|
|
||||||||||||||||
| async for message in ws: | ||||||||||||||||
| try: | ||||||||||||||||
| body = json.loads(message) | ||||||||||||||||
| except ValueError: | ||||||||||||||||
| raise Exception(message) | ||||||||||||||||
|
Check warning on line 140 in luno_python/stream_client.py
|
||||||||||||||||
|
|
||||||||||||||||
| if body == "": # Empty update, used as keepalive | ||||||||||||||||
| body = None | ||||||||||||||||
|
|
||||||||||||||||
| if is_first: | ||||||||||||||||
| is_first = False | ||||||||||||||||
| state = _MarketStreamState(body) | ||||||||||||||||
| update_f(pair, state.get_snapshot(), None) | ||||||||||||||||
| continue | ||||||||||||||||
|
|
||||||||||||||||
| #could raise OutOfOrderMessageException | ||||||||||||||||
| state.process_update(body) | ||||||||||||||||
|
|
||||||||||||||||
| update_f(pair, state.get_snapshot(), body) | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| async def _write_keep_alive(ws): | ||||||||||||||||
| while True: | ||||||||||||||||
| await ws.send('""') | ||||||||||||||||
| await asyncio.sleep(60) | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| async def stream_market( | ||||||||||||||||
| pair: str, | ||||||||||||||||
| api_key_id: str, | ||||||||||||||||
| api_key_secret: str, | ||||||||||||||||
| update_callback: StateUpdate, | ||||||||||||||||
| base_url: str = DEFAULT_URL, | ||||||||||||||||
| ): | ||||||||||||||||
| """Opens a stream to /api/1/stream/... | ||||||||||||||||
|
|
||||||||||||||||
| Stream orderbook information and maintain an orderbook state. | ||||||||||||||||
|
|
||||||||||||||||
| :param pair: str Amount to buy or sell in the pair base currency. | ||||||||||||||||
| :param api_key_id: str | ||||||||||||||||
| :param api_key_secret: str | ||||||||||||||||
| :param update_callback: an StateUpdate function that will be called with new updates. | ||||||||||||||||
| """ | ||||||||||||||||
| if len(pair) != 6: | ||||||||||||||||
| raise Exception("Invalid pair") | ||||||||||||||||
|
Check warning on line 180 in luno_python/stream_client.py
|
||||||||||||||||
|
|
||||||||||||||||
| p = Pair(pair[:3].upper(), pair[3:].upper()) | ||||||||||||||||
| url = '/'.join([base_url, 'api/1/stream', p.base + p.counter]) | ||||||||||||||||
|
|
||||||||||||||||
| async with websockets.connect( | ||||||||||||||||
| url, | ||||||||||||||||
| origin='http://localhost/', | ||||||||||||||||
| ping_interval=None, | ||||||||||||||||
| max_size=2**21, | ||||||||||||||||
| ) as websocket: | ||||||||||||||||
|
|
||||||||||||||||
| auth = json.dumps({ | ||||||||||||||||
| 'api_key_id': api_key_id, | ||||||||||||||||
| 'api_key_secret': api_key_secret, | ||||||||||||||||
| }) | ||||||||||||||||
| await websocket.send(auth) | ||||||||||||||||
|
|
||||||||||||||||
| await asyncio.gather( | ||||||||||||||||
| _read_from_websocket(websocket, p, update_callback), | ||||||||||||||||
| _write_keep_alive(websocket), | ||||||||||||||||
| ) | ||||||||||||||||
|
Comment on lines
+198
to
+201
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle task cancellation when one coroutine fails. If Proposed fix for Python 3.11+- await asyncio.gather(
- _read_from_websocket(websocket, p, update_callback),
- _write_keep_alive(websocket),
- )
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(_read_from_websocket(websocket, p, update_callback))
+ tg.create_task(_write_keep_alive(websocket))Alternative fix for older Python versions+ reader_task = asyncio.create_task(_read_from_websocket(websocket, p, update_callback))
+ keepalive_task = asyncio.create_task(_write_keep_alive(websocket))
+ try:
+ await reader_task
+ finally:
+ keepalive_task.cancel()
+ try:
+ await keepalive_task
+ except asyncio.CancelledError:
+ pass📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chain the exception and use a specific exception type.
When re-raising inside an
exceptclause, chain the original exception to preserve the traceback. Additionally, as flagged by static analysis, consider using a more specific exception type.Proposed fix
🧰 Tools
🪛 GitHub Check: SonarCloud Code Analysis
[warning] 140-140: Replace this generic exception class with a more specific one.
See more on https://sonarcloud.io/project/issues?id=luno_luno-python&issues=AZ0bGJLjBBDVKv5IdKf8&open=AZ0bGJLjBBDVKv5IdKf8&pullRequest=76
🪛 Ruff (0.15.6)
[warning] 140-140: Within an
exceptclause, raise exceptions withraise ... from errorraise ... from Noneto distinguish them from errors in exception handling(B904)
🤖 Prompt for AI Agents