Skip to content
This repository was archived by the owner on Jul 2, 2025. It is now read-only.

Commit 0748d3d

Browse files
committed
Add native retrial support to AmqpMessageQueue
- Add nativeRetrial option to AmqpMessageQueueOptions - When enabled, messages are not acknowledged on processing failure - Update documentation and examples in README - Add comprehensive tests for retrial behavior - Update dependencies to support new functionality fedify-dev/fedify#250
1 parent bc8a557 commit 0748d3d

File tree

7 files changed

+171
-46
lines changed

7 files changed

+171
-46
lines changed

README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ const federation = createFederation({
3030
});
3131
~~~~
3232

33+
The `AmqpMessageQueue` constructor accepts options as the second
34+
parameter, which can be used to configure the message queue:
35+
36+
~~~~ typescript
37+
new AmqpMessageQueue(await connect("amqp://localhost"), {
38+
queue: "my_queue",
39+
})
40+
~~~~
41+
42+
For more details, please refer to the docs of [`AmqpMessageQueueOptions`].
43+
3344
[JSR]: https://jsr.io/@fedify/amqp
3445
[JSR badge]: https://jsr.io/badges/@fedify/amqp
3546
[npm]: https://www.npmjs.com/package/@fedify/amqp
@@ -41,6 +52,7 @@ const federation = createFederation({
4152
[`KvStore`]: https://jsr.io/@fedify/fedify/doc/federation/~/KvStore
4253
[`MessageQueue`]: https://jsr.io/@fedify/fedify/doc/federation/~/MessageQueue
4354
[`AmqpMessageQueue`]: https://jsr.io/@fedify/amqp/doc/mq/~/AmqpMessageQueue
55+
[`AmqpMessageQueueOptions`]: https://jsr.io/@fedify/amqp/doc/mq/~/AmqpMessageQueueOptions
4456

4557

4658
Installation
@@ -72,6 +84,9 @@ Changelog
7284

7385
To be released.
7486

87+
- Added `nativeRetrial` option to `AmqpMessageQueueOptions` to enable
88+
native retrial of messages.
89+
7590
- The type of the `AmqpMessageQueue()` constructor's first parameter has been
7691
changed from `Connection` to `ChannelModel`.
7792

deno.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
"./mq": "./src/mq.ts"
88
},
99
"imports": {
10-
"@alinea/suite": "jsr:@alinea/suite@^0.6.2",
11-
"@fedify/fedify": "jsr:@fedify/fedify@^1.5.0",
10+
"@fedify/fedify": "jsr:@fedify/fedify@^1.7.0-dev.887+013dabb5",
11+
"@hongminhee/suite": "jsr:@hongminhee/suite@^0.6.3",
1212
"@std/assert": "jsr:@std/assert@^1.0.13",
1313
"@std/async": "jsr:@std/async@^1.0.13",
1414
"amqplib": "npm:amqplib@^0.10.8",

deno.lock

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,19 @@
3535
},
3636
"./package.json": "./package.json"
3737
},
38+
"peerDependencies": {
39+
"@fedify/fedify": "^1.7.0-dev.887",
40+
"amqplib": "^0.10.8"
41+
},
3842
"devDependencies": {
39-
"@alinea/suite": "^0.6.2",
43+
"@hongminhee/suite": "^0.6.3",
4044
"@js-temporal/polyfill": "^0.5.1",
4145
"@std/assert": "jsr:^1.0.13",
4246
"@std/async": "jsr:^1.0.13",
4347
"@types/amqplib": "^0.10.7",
4448
"tsdown": "^0.12.7",
4549
"typescript": "^5.8.3"
4650
},
47-
"peerDependencies": {
48-
"@fedify/fedify": "^1.5.0",
49-
"amqplib": "^0.10.8"
50-
},
5151
"scripts": {
5252
"build": "tsdown",
5353
"prepack": "tsdown",

pnpm-lock.yaml

Lines changed: 13 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/mq.test.ts

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { suite } from "@alinea/suite";
1+
import { suite } from "@hongminhee/suite";
22
import * as temporal from "@js-temporal/polyfill";
3-
import { assertEquals, assertGreater } from "@std/assert";
3+
import { assert, assertEquals, assertFalse, assertGreater } from "@std/assert";
44
import { delay } from "@std/async/delay";
55
// @deno-types="npm:@types/amqplib"
66
import { type ChannelModel, connect } from "amqplib";
@@ -21,13 +21,16 @@ function getConnection(): Promise<ChannelModel> {
2121
return connect(url ?? "amqp://localhost");
2222
}
2323

24-
test("AmqpMessageQueue", async () => {
24+
test("AmqpMessageQueue", {
25+
sanitizeOps: false,
26+
sanitizeExit: false,
27+
sanitizeResources: false,
28+
}, async () => {
2529
const conn = await getConnection();
2630
const conn2 = await getConnection();
27-
const queue = `fedify_queue_${Math.random().toString(36).slice(5)}`;
28-
const delayedQueuePrefix = `fedify_delayed_${
29-
Math.random().toString(36).slice(5)
30-
}_`;
31+
const randomSuffix = Math.random().toString(36).substring(2);
32+
const queue = `fedify_queue_${randomSuffix}`;
33+
const delayedQueuePrefix = `fedify_delayed_${randomSuffix}_`;
3134
const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix });
3235
const mq2 = new AmqpMessageQueue(conn2, { queue, delayedQueuePrefix });
3336

@@ -99,6 +102,76 @@ test("AmqpMessageQueue", async () => {
99102
await conn2.close();
100103
});
101104

105+
test(
106+
"AmqpMessageQueue [nativeRetrial: false]",
107+
{ sanitizeOps: false, sanitizeExit: false, sanitizeResources: false },
108+
async () => {
109+
const conn = await getConnection();
110+
const randomSuffix = Math.random().toString(36).substring(2);
111+
const queue = `fedify_queue_${randomSuffix}`;
112+
const delayedQueuePrefix = `fedify_delayed_${randomSuffix}_`;
113+
const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix });
114+
assertFalse(mq.nativeRetrial);
115+
116+
const controller = new AbortController();
117+
let i = 0;
118+
const listening = mq.listen((message: string) => {
119+
if (message !== "Hello, world!") return;
120+
if (i++ < 1) {
121+
throw new Error("Test error to check native retrial");
122+
}
123+
}, { signal: controller.signal });
124+
125+
await mq.enqueue("Hello, world!");
126+
127+
await waitFor(() => i >= 1, 15_000);
128+
assertEquals(i, 1);
129+
await delay(5_000);
130+
131+
controller.abort();
132+
await listening;
133+
await conn.close();
134+
135+
assertEquals(i, 1);
136+
},
137+
);
138+
139+
test(
140+
"AmqpMessageQueue [nativeRetrial: true]",
141+
{ sanitizeOps: false, sanitizeExit: false, sanitizeResources: false },
142+
async () => {
143+
const conn = await getConnection();
144+
const randomSuffix = Math.random().toString(36).substring(2);
145+
const queue = `fedify_queue_${randomSuffix}`;
146+
const delayedQueuePrefix = `fedify_delayed_${randomSuffix}_`;
147+
const mq = new AmqpMessageQueue(conn, {
148+
queue,
149+
delayedQueuePrefix,
150+
nativeRetrial: true,
151+
});
152+
assert(mq.nativeRetrial);
153+
154+
const controller = new AbortController();
155+
let i = 0;
156+
const listening = mq.listen((message: string) => {
157+
if (message !== "Hello, world!") return;
158+
if (i++ < 1) {
159+
throw new Error("Test error to check native retrial");
160+
}
161+
}, { signal: controller.signal });
162+
163+
await mq.enqueue("Hello, world!");
164+
165+
await waitFor(() => i > 1, 15_000);
166+
167+
controller.abort();
168+
await listening;
169+
await conn.close();
170+
171+
assertGreater(i, 1);
172+
},
173+
);
174+
102175
async function waitFor(
103176
predicate: () => boolean,
104177
timeoutMs: number,

0 commit comments

Comments
 (0)