Skip to content

Commit 20cacbc

Browse files
committed
fix: handle ending acks on quit
1 parent 895424b commit 20cacbc

File tree

2 files changed

+7
-15
lines changed

2 files changed

+7
-15
lines changed

src/stream.ts

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,12 @@ export class RedisStream {
212212
public async quit(): Promise<void> {
213213
if (!this.done) {
214214
this.done = true
215+
if (this.pendingAcks.size || this.readerId) {
216+
const pipeline = (this.control ? this.control : this.client).pipeline()
217+
this.pendingAcks.size && ack(pipeline, this)
218+
this.readerId && pipeline.client('UNBLOCK', this.readerId)
219+
await pipeline.exec()
220+
}
215221
if (!(this.createdConnection || this.createdControlConnection)) return
216222
await Promise.all([
217223
this.createdConnection && new Promise((resolve) => this.client.once('end', resolve)),
@@ -262,20 +268,6 @@ export class RedisStream {
262268
await this.maybeUnblock()
263269
}
264270

265-
/**
266-
* Immediately stop processing entries
267-
*/
268-
public async end() {
269-
if (this.control && this.readerId) {
270-
const pipeline = this.control.pipeline()
271-
ack(pipeline, this)
272-
pipeline.client('UNBLOCK', this.readerId)
273-
await Promise.all([pipeline.exec(), this.quit()])
274-
} else {
275-
await this.quit()
276-
}
277-
}
278-
279271
protected async return(): Promise<void> {
280272
await this.quit()
281273
}

src/xread.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ describe('redis-x-stream xread', () => {
116116
if (i === testEntries.length * 2 - 1) {
117117
setTimeout(() => {
118118
i++
119-
stream.end() //break;
119+
stream.quit() //break;
120120
}, 100)
121121
}
122122
}

0 commit comments

Comments
 (0)