Skip to content

Commit e9d3c18

Browse files
committed
feat: support addStream and drain during iteration
1 parent 558b2b9 commit e9d3c18

File tree

7 files changed

+163
-42
lines changed

7 files changed

+163
-42
lines changed

src/redis.ts

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import Redis, { ChainableCommander, RedisOptions } from 'ioredis'
22
import { RedisStream } from './stream.js'
33
import mkDebug from 'debug'
4-
import { XBatchResult, XStreamResult, env } from './types.js'
4+
import { XStreamResult, env } from './types.js'
55

66
const debug = mkDebug('redis-x-stream')
77

@@ -16,8 +16,15 @@ function isNumber(num: number | string | undefined): num is number {
1616
export async function readAckDelete(
1717
stream: RedisStream
1818
): Promise<IterableIterator<XStreamResult> | undefined> {
19-
const pipeline = stream.client.pipeline(),
20-
read = stream.group ? xreadgroup : xread
19+
const pipeline = stream.client.pipeline()
20+
21+
if (stream.draining) {
22+
ack(pipeline, stream)
23+
await pipeline.exec()
24+
return
25+
}
26+
27+
const read = stream.group ? xreadgroup : xread
2128
xgroup(pipeline, stream)
2229
ack(pipeline, stream)
2330
read(pipeline, stream)
@@ -27,17 +34,17 @@ export async function readAckDelete(
2734
return
2835
}
2936

30-
//TODO: NOGROUP the consumer group this client was blocked on no longer exists
37+
//TODO: FATAL - NOGROUP the consumer group this client was blocked on no longer exists
3138
for (const result of responses) {
3239
if (result[0] && !result[0]?.message.startsWith('BUSYGROUP')) {
3340
throw responses[0]
3441
}
3542
}
36-
const result = responses[responses.length - 1][1] as XBatchResult | null
43+
const result = responses[responses.length - 1][1] as XStreamResult[] | null
3744
if (!result) {
3845
return
3946
}
40-
if (read === xreadgroup) {
47+
if (stream.group) {
4148
for (const stream of result) {
4249
if (stream[1].length) {
4350
return result[Symbol.iterator]()
@@ -48,7 +55,10 @@ export async function readAckDelete(
4855
}
4956
}
5057

51-
function ack(client: ChainableCommander, { deleteOnAck, pendingAcks, group }: RedisStream): void {
58+
export function ack(
59+
client: ChainableCommander,
60+
{ deleteOnAck, pendingAcks, group }: RedisStream
61+
): void {
5262
if (!group || !pendingAcks.size) return
5363
for (const [stream, ids] of pendingAcks) {
5464
client.xack(stream, group, ...ids)
@@ -69,38 +79,29 @@ function xread(client: ChainableCommander, { block, count, streams, buffers }: R
6979
block = block === Infinity ? 0 : block
7080
const args: Parameters<typeof client['xread']> = ['COUNT', count] as IncrementalParameters
7181
if (isNumber(block)) args.unshift('BLOCK', block)
82+
args.push('STREAMS', ...streams.keys(), ...streams.values())
7283
debug(`xread ${args.join(' ')}`)
73-
client[buffers ? 'xreadBuffer' : 'xread'](
74-
...args,
75-
'STREAMS',
76-
...streams.keys(),
77-
...streams.values()
78-
)
84+
client[buffers ? 'xreadBuffer' : 'xread'](...args)
7985
}
8086

8187
function xreadgroup(
8288
client: ChainableCommander,
83-
{ block, count, group, consumer, noack, streams, buffers }: RedisStream
89+
{ block, count, first, group, consumer, noack, streams, buffers }: RedisStream
8490
): void {
8591
block = block === Infinity ? 0 : block
8692
const args: Parameters<typeof client['xreadgroup']> = [
8793
'GROUP',
8894
group as string,
8995
consumer as string,
90-
'COUNT',
91-
count.toString(),
9296
] as IncrementalParameters
97+
if (!first) args.push('COUNT', count.toString())
9398
if (noack) args.push('NOACK')
9499
if (isNumber(block)) args.push('BLOCK', block.toString())
100+
args.push('STREAMS', ...streams.keys(), ...streams.values())
95101
debug(`xreadgroup ${args.join(' ')}`)
96102
// eslint-disable-next-line @typescript-eslint/no-explicit-any
97103
// https://github.com/luin/ioredis/pull/1676#issue-1437398115
98-
;(client as any)[buffers ? 'xreadgroupBuffer' : 'xreadgroup'](
99-
...args,
100-
'STREAMS',
101-
...streams.keys(),
102-
...streams.values()
103-
)
104+
;(client as KindaAny)[buffers ? 'xreadgroupBuffer' : 'xreadgroup'](...args)
104105
}
105106

106107
export function createClient(options?: Redis | string | RedisOptions) {

src/stream.spec.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { describe, it, expect } from 'vitest'
12
import Redis from 'ioredis'
23
import redisStream from './stream'
34
import { rand } from './test.util.spec'
@@ -60,8 +61,8 @@ describe('RedisStream xread', () => {
6061
})
6162

6263
it('should manage initial and finished state', async () => {
63-
const stream = redisStream('my-stream' + rand())
64-
expect(stream.count).toEqual(100)
64+
const stream = redisStream({ streams: ['my-stream' + rand()], count: 2 })
65+
expect(stream.count).toEqual(2)
6566
expect(stream.noack).toEqual(false)
6667
expect(stream.block).toBeUndefined()
6768
expect(stream.deleteOnAck).toBeFalsy()
@@ -88,7 +89,7 @@ describe('RedisStream xread', () => {
8889
return Promise.all([str.quit(), arr.quit(), rec.quit()])
8990
})
9091

91-
it('should start at zero by default for xread ', () => {
92+
it('should start at zero by default for xread', () => {
9293
const myStream = 'key' + rand()
9394
const stream = redisStream(myStream)
9495
expect(stream.streams.get(myStream)).toEqual('0')

src/stream.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { hostname } from 'node:os'
2-
import { createClient, readAckDelete } from './redis.js'
2+
import { ack, createClient, readAckDelete } from './redis.js'
33
import {
44
RedisStreamOptions,
55
RedisClient,
@@ -31,6 +31,7 @@ export class RedisStream {
3131
public readonly control?: RedisClient
3232
public readonly group?: string
3333
public readonly consumer?: string
34+
public readonly blocked: boolean = false
3435

3536
//xread options
3637
public streams: Map<string, string>
@@ -55,6 +56,12 @@ export class RedisStream {
5556
*/
5657
public done = false
5758
public first = false
59+
public draining = false
60+
private unblocked = false
61+
private reading = false
62+
63+
private readerId: number | null = null
64+
private pendingId: Promise<number | null> | null = null
5865

5966
private itr = {
6067
name: '',
@@ -88,6 +95,7 @@ export class RedisStream {
8895
}
8996

9097
if (this.block === 0 || this.block === Infinity) {
98+
this.blocked = true
9199
const { client, created } = createClient(options.redisControl)
92100
this.control = client
93101
this.createdControlConnection = created
@@ -101,6 +109,13 @@ export class RedisStream {
101109
this.createdConnection = created
102110
this.client = client
103111

112+
if (this.blocked) {
113+
this.pendingId = this.client.client('ID').then(
114+
(id) => (this.readerId = id),
115+
() => (this.pendingId = this.readerId = null)
116+
)
117+
}
118+
104119
if (options.consumer || options.group) {
105120
if (!options.group) {
106121
this.group = '_xs_g_' + options.consumer
@@ -152,9 +167,15 @@ export class RedisStream {
152167
this.itr.prev = this.ack(this.itr.name, this.itr.prev[0].toString())
153168
}
154169
if (!itr.stream) {
170+
this.reading = true
155171
itr.stream = await readAckDelete(this)
172+
this.reading = false
156173
}
157174
if (!itr.stream && !this.first) {
175+
if (this.unblocked) {
176+
this.unblocked = false
177+
continue
178+
}
158179
return this.quit()
159180
}
160181
if (this.first) {
@@ -180,6 +201,7 @@ export class RedisStream {
180201
if (result.done) {
181202
itr.entry = null
182203
} else {
204+
//TODO add test case for this '>'
183205
this.streams.set(itr.name, this.group ? '>' : result.value[0].toString())
184206
if (this.ackOnIterate) itr.prev = result.value
185207
yield [itr.name, result.value]
@@ -211,6 +233,47 @@ export class RedisStream {
211233
return
212234
}
213235

236+
private async maybeUnblock() {
237+
if (this.reading && !this.done) {
238+
if (typeof this.readerId !== 'number') {
239+
await this.pendingId
240+
this.pendingId = null
241+
}
242+
if (typeof this.readerId !== 'number') {
243+
throw new Error('Unable to read client id')
244+
}
245+
this.unblocked = true
246+
await this.control?.client('UNBLOCK', this.readerId)
247+
}
248+
}
249+
250+
public async addStream(streamName: string) {
251+
this.streams.set(streamName, '0')
252+
await this.maybeUnblock()
253+
}
254+
255+
/**
256+
* Iterate through remaining items in the PEL and exit
257+
*/
258+
public async drain() {
259+
this.draining = true
260+
await this.maybeUnblock()
261+
}
262+
263+
/**
264+
* Immediately stop processing entries
265+
*/
266+
public async end() {
267+
if (this.control && this.readerId) {
268+
const pipeline = this.control.pipeline()
269+
ack(pipeline, this)
270+
pipeline.client('UNBLOCK', this.readerId)
271+
await Promise.all([pipeline.exec(), this.quit()])
272+
} else {
273+
await this.quit()
274+
}
275+
}
276+
214277
protected async return(): Promise<void> {
215278
await this.quit()
216279
}

src/test.util.spec.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
import { RedisClient, StreamEntry, XEntryResult } from './types.js'
2+
import mkDebug from 'debug'
3+
4+
const debug = mkDebug('redis-x-stream')
25

36
const delay = (ms: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, ms)),
4-
times = <T>(count: number, fn: () => T): Array<T> => Array.from(Array(count), fn) as T[],
7+
times = <T>(count: number, fn: (_: undefined, i: number) => T): Array<T> =>
8+
Array.from(Array(count), fn) as T[],
59
quit = async (client: RedisClient): Promise<void> => {
610
await client.quit()
711
return new Promise((resolve) => client.once('end', resolve))
812
},
13+
randNum = (min: number, max: number) => Math.floor(Math.random() * (max - min) + min),
914
rand = (): string => Math.random().toString(36).slice(6),
1015
drain = async (iterable: AsyncIterable<XEntryResult>): Promise<Map<string, StreamEntry[]>> => {
1116
const results = new Map<string, StreamEntry[]>()
@@ -17,15 +22,12 @@ const delay = (ms: number): Promise<void> => new Promise((resolve) => setTimeout
1722
return results
1823
},
1924
redisIdRegex = /\d+-\d/,
20-
testEntries = [
21-
['1', 'hi'],
22-
['2', 'hello'],
23-
['3', 'hai'],
24-
]
25+
testEntries = times(randNum(7, 23), (_, i) => [i.toString(), rand()])
2526
async function hydrateForTest(writer: RedisClient, stream: string, ...values: string[][]) {
2627
if (!values.length) values = testEntries
2728
const pipeline = writer.pipeline()
2829
for (const [key, value] of values) {
30+
debug(`xadd ${stream} * ${key} ${value}`)
2931
pipeline.xadd(stream, '*', key, value)
3032
}
3133
await pipeline.exec()

src/types.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ export type StreamEntry = [StreamEntryId, StreamEntryKeyValues]
99
export type XEntryResult = [StreamKey, StreamEntry]
1010
//Result Types from iterator
1111
export type XStreamResult = [StreamKey, StreamEntry[]]
12-
export type XBatchResult = XStreamResult[]
1312

1413
//Buffers
1514
export type StreamEntryKeyValueBuffers = Buffer[]

src/xread.spec.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import Redis from 'ioredis'
2+
import { describe, it, expect, beforeAll, afterAll, beforeEach, afterEach } from 'vitest'
23
import { RedisStream } from './stream.js'
34
import { RedisClient } from './types.js'
45
import { delay, hydrateForTest, quit, times, testEntries, redisIdRegex } from './test.util.spec.js'
@@ -15,7 +16,6 @@ describe('redis-x-stream xread', () => {
1516
writer = new Redis()
1617
})
1718
afterAll(() => quit(writer))
18-
1919
beforeEach(() => {
2020
prefix = Math.random().toString(36).slice(6) + '_'
2121
reader = new Redis()
@@ -76,9 +76,10 @@ describe('redis-x-stream xread', () => {
7676
const stream = new RedisStream(streamName)
7777
const values = await hydrateForTest(writer, streamName)
7878
let ackAttempts = 0
79-
for await (const [_, [id, __]] of stream) {
79+
for await (const [_, [id]] of stream) {
80+
void _
8081
try {
81-
await stream.ack(id)
82+
stream.ack(id)
8283
} catch (e: unknown) {
8384
if (e instanceof Error) {
8485
ackAttempts++
@@ -90,6 +91,36 @@ describe('redis-x-stream xread', () => {
9091
expect(ackAttempts).toBe(values.length)
9192
})
9293

94+
it('should allow adding a stream on a blocked iterable', async () => {
95+
const myStream = key('my-stream')
96+
const laterStream = key('later-stream')
97+
await hydrateForTest(writer, myStream)
98+
await hydrateForTest(writer, laterStream)
99+
const stream = new RedisStream({ streams: [myStream], block: Infinity })
100+
let i = 0
101+
for await (const [streamName, _] of stream) {
102+
i++
103+
if (i === testEntries.length) {
104+
expect(streamName).toEqual(myStream)
105+
setTimeout(() => {
106+
//TODO: expect stream is blocked?
107+
stream.addStream(laterStream)
108+
})
109+
}
110+
if (i > testEntries.length) {
111+
expect(streamName).toEqual(laterStream)
112+
}
113+
if (i === testEntries.length * 2) {
114+
setTimeout(() => {
115+
i++
116+
stream.end() //break;
117+
})
118+
}
119+
}
120+
//stream will block indefinitely (i++ in the future to assert after loop)
121+
expect(i).toEqual(testEntries.length * 2 + 1)
122+
})
123+
93124
it('should not allow re-iteration (done is set)', async () => {
94125
const streamName = key('my-stream'),
95126
iterable = new RedisStream(streamName)

0 commit comments

Comments
 (0)