1- import { Pipeline } from 'ioredis'
1+ import { ChainableCommander } from 'ioredis'
22import { RedisStream } from './stream.js'
33import mkDebug from 'debug'
44import { XBatchResult , XStreamResult } from './types.js'
@@ -21,6 +21,11 @@ export async function readAckDelete(
2121 ack ( pipeline , stream )
2222 read ( pipeline , stream )
2323 const responses = await pipeline . exec ( )
24+
25+ if ( ! responses ) {
26+ return
27+ }
28+
2429 //TODO NOGROUP the consumer group this client was blocked on no longer exists
2530 for ( const result of responses ) {
2631 if ( result [ 0 ] && ! result [ 0 ] ?. message . startsWith ( 'BUSYGROUP' ) ) {
@@ -42,7 +47,10 @@ export async function readAckDelete(
4247 }
4348}
4449
45- function ack ( client : Pipeline , { deleteOnAck, pendingAcks, group } : RedisStream < KindaAny > ) : void {
50+ function ack (
51+ client : ChainableCommander ,
52+ { deleteOnAck, pendingAcks, group } : RedisStream < KindaAny >
53+ ) : void {
4654 if ( ! group || ! pendingAcks . size ) return
4755 for ( const [ stream , ids ] of pendingAcks ) {
4856 client . xack ( stream , group , ...ids )
@@ -51,11 +59,14 @@ function ack(client: Pipeline, { deleteOnAck, pendingAcks, group }: RedisStream<
5159 pendingAcks . clear ( )
5260}
5361
54- function xgroup ( client : Pipeline , { group, streams, first } : RedisStream < KindaAny > ) : void {
62+ function xgroup (
63+ client : ChainableCommander ,
64+ { group, streams, first } : RedisStream < KindaAny >
65+ ) : void {
5566 if ( ! first || ! group ) return
5667 for ( const [ key , start ] of streams ) {
5768 debug ( `xgroup create ${ key } ${ group } ${ start } mkstream` )
58- client . xgroup ( 'create ' , key , group , start , 'mkstream ' )
69+ client . xgroup ( 'CREATE ' , key , group , start , 'MKSTREAM ' )
5970 }
6071}
6172
0 commit comments