Skip to content

Commit 0e8bacb

Browse files
authored
stream crypto orderbook (#597)
* stream crypto orderbook * flake8 format * sonarcloud check
1 parent bddd0a1 commit 0e8bacb

File tree

2 files changed

+100
-14
lines changed

2 files changed

+100
-14
lines changed

alpaca_trade_api/entity_v2.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,14 @@
8787
"t": "timestamp",
8888
}
8989

90+
orderbook_mapping_v2 = {
91+
"S": "symbol",
92+
"x": "exchange",
93+
"t": "timestamp",
94+
"b": "bids",
95+
"a": "asks",
96+
}
97+
9098

9199
class EntityListType(Enum):
92100
Trade = Trade, trade_mapping_v2
@@ -225,6 +233,22 @@ def __init__(self, raw):
225233
self[k] = _convert_or_none(QuoteV2, v)
226234

227235

236+
class BidOrAsk(Entity):
237+
def __init__(self, raw):
238+
super().__init__(raw)
239+
240+
241+
class OrderbookV2(Entity):
242+
def __init__(self, raw):
243+
super().__init__(raw)
244+
if self.bids:
245+
for i in range(len(self.bids)):
246+
self.bids[i] = BidOrAsk(self.bids[i])
247+
if self.asks:
248+
for i in range(len(self.asks)):
249+
self.asks[i] = BidOrAsk(self.asks[i])
250+
251+
228252
class NewsV2(Entity):
229253
def __init__(self, raw):
230254
super().__init__(raw)

alpaca_trade_api/stream.py

Lines changed: 76 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
luld_mapping_v2,
1919
cancel_error_mapping_v2,
2020
correction_mapping_v2,
21+
orderbook_mapping_v2,
2122
Trade,
2223
Quote,
2324
Bar,
@@ -26,6 +27,7 @@
2627
CancelErrorV2,
2728
CorrectionV2,
2829
NewsV2,
30+
OrderbookV2,
2931
)
3032

3133
log = logging.getLogger(__name__)
@@ -219,16 +221,7 @@ async def _unsubscribe(self,
219221
bars=(),
220222
updated_bars=(),
221223
daily_bars=()):
222-
if trades or quotes or bars or updated_bars or daily_bars:
223-
await self._ws.send(
224-
msgpack.packb({
225-
'action': 'unsubscribe',
226-
'trades': trades,
227-
'quotes': quotes,
228-
'bars': bars,
229-
'updatedBars': updated_bars,
230-
'dailyBars': daily_bars,
231-
}))
224+
raise NotImplementedError()
232225

233226
async def _run_forever(self):
234227
self._loop = asyncio.get_running_loop()
@@ -311,8 +304,9 @@ def unsubscribe_bars(self, *symbols):
311304

312305
def unsubscribe_updated_bars(self, *symbols):
313306
if self._running:
314-
asyncio.get_event_loop().run_until_complete(
315-
self._unsubscribe(updated_bars=symbols))
307+
asyncio.run_coroutine_threadsafe(
308+
self._unsubscribe(updated_bars=symbols),
309+
self._loop).result()
316310
for symbol in symbols:
317311
del self._handlers['updatedBars'][symbol]
318312

@@ -482,8 +476,63 @@ def __init__(self,
482476
raw_data=raw_data,
483477
websocket_params=websocket_params,
484478
)
479+
self._handlers['orderbooks'] = {}
485480
self._name = 'crypto data'
486481

482+
def _cast(self, msg_type, msg):
483+
result = super()._cast(msg_type, msg)
484+
if not self._raw_data:
485+
if msg_type == 'o':
486+
result = OrderbookV2({
487+
orderbook_mapping_v2[k]: v
488+
for k, v in msg.items() if k in orderbook_mapping_v2
489+
})
490+
return result
491+
492+
async def _dispatch(self, msg):
493+
msg_type = msg.get('T')
494+
symbol = msg.get('S')
495+
if msg_type == 'o':
496+
handler = self._handlers['orderbooks'].get(
497+
symbol, self._handlers['orderbooks'].get('*', None))
498+
if handler:
499+
await handler(self._cast(msg_type, msg))
500+
else:
501+
await super()._dispatch(msg)
502+
503+
async def _unsubscribe(self,
504+
trades=(),
505+
quotes=(),
506+
orderbooks=(),
507+
bars=(),
508+
updated_bars=(),
509+
daily_bars=()):
510+
if (
511+
trades or quotes or orderbooks or bars or updated_bars
512+
or daily_bars
513+
):
514+
await self._ws.send(
515+
msgpack.packb({
516+
'action': 'unsubscribe',
517+
'trades': trades,
518+
'quotes': quotes,
519+
'orderbooks': orderbooks,
520+
'bars': bars,
521+
'updatedBars': updated_bars,
522+
'dailyBars': daily_bars,
523+
}))
524+
525+
def subscribe_orderbooks(self, handler, *symbols):
526+
self._subscribe(handler, symbols, self._handlers['orderbooks'])
527+
528+
def unsubscribe_orderbooks(self, *symbols):
529+
if self._running:
530+
asyncio.run_coroutine_threadsafe(
531+
self._unsubscribe(orderbooks=symbols),
532+
self._loop).result()
533+
for symbol in symbols:
534+
del self._handlers['orderbooks'][symbol]
535+
487536

488537
class NewsDataStream(_DataStream):
489538
def __init__(self,
@@ -529,7 +578,7 @@ async def _unsubscribe(self, news=()):
529578
if news:
530579
await self._ws.send(
531580
msgpack.packb({
532-
'action': 'unsubscribe',
581+
'action': 'unsubscribe',
533582
'news': news,
534583
}))
535584

@@ -779,6 +828,9 @@ def subscribe_crypto_updated_bars(self, handler, *symbols):
779828
def subscribe_crypto_daily_bars(self, handler, *symbols):
780829
self._crypto_ws.subscribe_daily_bars(handler, *symbols)
781830

831+
def subscribe_crypto_orderbooks(self, handler, *symbols):
832+
self._crypto_ws.subscribe_orderbooks(handler, *symbols)
833+
782834
def subscribe_news(self, handler, *symbols):
783835
self._news_ws.subscribe_news(handler, *symbols)
784836

@@ -842,7 +894,7 @@ def decorator(func):
842894

843895
return decorator
844896

845-
def on_corrections(self, *symbols):
897+
def on_correction(self, *symbols):
846898
def decorator(func):
847899
self._data_ws.register_handler("corrections", func, *symbols)
848900
return func
@@ -884,6 +936,13 @@ def decorator(func):
884936

885937
return decorator
886938

939+
def on_crypto_orderbook(self, *symbols):
940+
def decorator(func):
941+
self.subscribe_crypto_orderbooks(func, *symbols)
942+
return func
943+
944+
return decorator
945+
887946
def on_news(self, *symbols):
888947
def decorator(func):
889948
self.subscribe_news(func, *symbols)
@@ -929,6 +988,9 @@ def unsubscribe_crypto_updated_bars(self, *symbols):
929988
def unsubscribe_crypto_daily_bars(self, *symbols):
930989
self._crypto_ws.unsubscribe_daily_bars(*symbols)
931990

991+
def unsubscribe_crypto_orderbooks(self, *symbols):
992+
self._crypto_ws.unsubscribe_orderbooks(*symbols)
993+
932994
def unsubscribe_news(self, *symbols):
933995
self._news_ws.unsubscribe_news(*symbols)
934996

0 commit comments

Comments
 (0)