diff --git a/README.md b/README.md index 6dd246f..dbcbdc0 100644 --- a/README.md +++ b/README.md @@ -81,9 +81,9 @@ stream.latestTradeDetail$.subscribe((v) => {}) - [ ] Delete Listen Key * Socket API * Market Data - - [ ] Subscribe Market Depth Data + - [x] Subscribe Market Depth Data - [x] Subscribe the Latest Trade Detail - - [ ] Subscribe K-Line Data + - [x] Subscribe K-Line Data * Account Data - [x] Listen Key expired push - [x] Account balance and position update push diff --git a/src/bingx-socket/bingx-market-socket-stream.ts b/src/bingx-socket/bingx-market-socket-stream.ts index e09940f..583b772 100644 --- a/src/bingx-socket/bingx-market-socket-stream.ts +++ b/src/bingx-socket/bingx-market-socket-stream.ts @@ -14,10 +14,15 @@ import { } from 'rxjs'; import { HeartbeatInterface } from 'bingx-api/bingx-socket/interfaces/heartbeat.interface'; import { + KlineEvent, + KlineInterval, LatestTradeEvent, MarkerSubscription, + MarketDepthEvent, + MarketDepthLevel, MarketWebsocketEvents, SubscriptionType, + TradingPair, } from 'bingx-api/bingx-socket/events/market-websocket-events'; export class BingxMarketSocketStream { @@ -28,6 +33,8 @@ export class BingxMarketSocketStream { public readonly onDisconnect$ = new Subject(); public readonly heartbeat$ = new ReplaySubject(1); public readonly latestTradeDetail$ = new Subject(); + public readonly marketDepth$ = new Subject(); + public readonly kline$ = new Subject(); constructor( url: URL = new URL('/swap-market', 'wss://open-api-swap.bingx.com'), @@ -53,6 +60,15 @@ export class BingxMarketSocketStream { event.dataType.includes('trade'), this.latestTradeDetail$, ), + filterAndEmitToSubject( + (event): event is MarketDepthEvent => + event.dataType.includes('@depth'), + this.marketDepth$, + ), + filterAndEmitToSubject( + (event): event is KlineEvent => event.dataType.includes('@kline_'), + this.kline$, + ), ) .subscribe(); @@ -92,4 +108,15 @@ export class BingxMarketSocketStream { public subscribe(dataType: SubscriptionType) { this.dataTypes$.next(dataType); } + + public subscribeMarketDepth( + symbol: TradingPair, + level: MarketDepthLevel = 20, + ) { + this.subscribe(`${symbol}@depth${level}`); + } + + public subscribeKline(symbol: TradingPair, interval: KlineInterval) { + this.subscribe(`${symbol}@kline_${interval}`); + } } diff --git a/src/bingx-socket/bingx-market-socket-subscriptions.spec.ts b/src/bingx-socket/bingx-market-socket-subscriptions.spec.ts new file mode 100644 index 0000000..b634baa --- /dev/null +++ b/src/bingx-socket/bingx-market-socket-subscriptions.spec.ts @@ -0,0 +1,126 @@ +import { Server, WebSocket } from 'ws'; +import { getPortFree } from 'bingx-api/get-port'; +import * as zlib from 'zlib'; +import { BingxMarketSocketStream } from 'bingx-api/bingx-socket/bingx-market-socket-stream'; +import { KlineEvent, MarketDepthEvent } from 'bingx-api/bingx-socket/events'; + +describe('bingx market socket subscriptions', () => { + let wss: Server; + let port: number; + let stream: BingxMarketSocketStream | undefined; + const sockets: WebSocket[] = []; + + const sendToSocket = (socket: WebSocket, msg: string) => { + zlib.gzip(msg, (err, result) => { + socket.send(result); + }); + }; + + beforeEach(async () => { + port = await getPortFree(); + wss = new Server({ port }); + await new Promise((resolve) => { + wss.on('listening', resolve); + wss.on('connection', (ws) => { + sockets[0] = ws; + ws.on('close', () => { + sockets.splice(0, 1); + }); + }); + }); + }); + + afterEach((done) => { + stream?.disconnect(); + wss.close(() => { + sockets.splice(0, sockets.length); + done(); + }); + }); + + it('subscribes to market depth and emits depth events', (done) => { + wss.once('connection', (socket) => { + socket.once('message', (message) => { + expect(JSON.parse(message.toString())).toStrictEqual({ + id: 'listen-for-BTC-USDT@depth5', + reqType: 'sub', + dataType: 'BTC-USDT@depth5', + }); + + sendToSocket( + socket, + JSON.stringify({ + code: 0, + dataType: 'BTC-USDT@depth5', + data: { + asks: [{ p: 30100.1, v: 2.5 }], + bids: [{ p: 30099.9, v: 1.25 }], + }, + } as MarketDepthEvent), + ); + }); + }); + + stream = new BingxMarketSocketStream(new URL('', `ws://0.0.0.0:${port}`)); + stream.marketDepth$.subscribe((event) => { + expect(event).toStrictEqual({ + code: '0', + dataType: 'BTC-USDT@depth5', + data: { + asks: [{ p: '30100.1', v: '2.5' }], + bids: [{ p: '30099.9', v: '1.25' }], + }, + }); + done(); + }); + + stream.subscribeMarketDepth('BTC-USDT', 5); + }); + + it('subscribes to kline and emits kline events', (done) => { + wss.once('connection', (socket) => { + socket.once('message', (message) => { + expect(JSON.parse(message.toString())).toStrictEqual({ + id: 'listen-for-BTC-USDT@kline_1m', + reqType: 'sub', + dataType: 'BTC-USDT@kline_1m', + }); + + sendToSocket( + socket, + JSON.stringify({ + code: 0, + dataType: 'BTC-USDT@kline_1m', + data: { + c: 30105, + h: 30120, + l: 30080, + o: 30090, + v: 42.5, + s: 'BTC-USDT', + }, + } as KlineEvent), + ); + }); + }); + + stream = new BingxMarketSocketStream(new URL('', `ws://0.0.0.0:${port}`)); + stream.kline$.subscribe((event) => { + expect(event).toStrictEqual({ + code: '0', + dataType: 'BTC-USDT@kline_1m', + data: { + c: '30105', + h: '30120', + l: '30080', + o: '30090', + v: '42.5', + s: 'BTC-USDT', + }, + }); + done(); + }); + + stream.subscribeKline('BTC-USDT', '1m'); + }); +}); diff --git a/src/bingx-socket/events/index.ts b/src/bingx-socket/events/index.ts index add0f81..974aa0b 100644 --- a/src/bingx-socket/events/index.ts +++ b/src/bingx-socket/events/index.ts @@ -4,6 +4,15 @@ export { SubscriptionType, LatestTradeEvent, TradeDataType, + MarketDepthDataType, + MarketDepthEvent, + MarketDepthData, + MarketDepthEntry, + MarketDepthLevel, + KlineDataType, + KlineEvent, + KlineData, + KlineInterval, MarketWebsocketEvents, Price, Volume, diff --git a/src/bingx-socket/events/market-websocket-events.ts b/src/bingx-socket/events/market-websocket-events.ts index 80e65d2..8bec78b 100644 --- a/src/bingx-socket/events/market-websocket-events.ts +++ b/src/bingx-socket/events/market-websocket-events.ts @@ -12,12 +12,34 @@ export enum MarkerWebsocketEventCode { export type TransactionTimeInMillis = number; export type TradingPair = `${string}-${string}`; export type IsMarketMaker = boolean; -export type Price = number; -export type Volume = number; +export type Price = string | number; +export type Volume = string | number; export type TradeDataType = `${TradingPair}@trade`; +export type MarketDepthLevel = 5 | 10 | 20 | 50 | 100; +export type MarketDepthDataType = `${TradingPair}@depth${MarketDepthLevel}`; +export type KlineInterval = + | '1m' + | '3m' + | '5m' + | '15m' + | '30m' + | '1h' + | '2h' + | '4h' + | '6h' + | '8h' + | '12h' + | '1d' + | '3d' + | '1w' + | '1M'; +export type KlineDataType = `${TradingPair}@kline_${KlineInterval}`; -export type SubscriptionType = TradeDataType; +export type SubscriptionType = + | TradeDataType + | MarketDepthDataType + | KlineDataType; export interface MarketWebsocketEvents { code: MarkerWebsocketEventCode; @@ -37,3 +59,34 @@ export interface LatestTradeEvent extends MarketWebsocketEvents { dataType: TradeDataType; data: TradeDetail[]; } + +export interface MarketDepthEntry { + p: Price; + v: Volume; +} + +export interface MarketDepthData { + asks: MarketDepthEntry[]; + bids: MarketDepthEntry[]; +} + +export interface MarketDepthEvent extends MarketWebsocketEvents { + code: MarkerWebsocketEventCode; + dataType: MarketDepthDataType; + data: MarketDepthData; +} + +export interface KlineData { + c: Price; + h: Price; + l: Price; + o: Price; + v: Volume; + s: TradingPair; +} + +export interface KlineEvent extends MarketWebsocketEvents { + code: MarkerWebsocketEventCode; + dataType: KlineDataType; + data: KlineData; +}