Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit c3c1dc0

Browse files
committed
feat(core): add abort controller & implement waitUntil
1 parent 804497f commit c3c1dc0

File tree

4 files changed

+72
-15
lines changed

4 files changed

+72
-15
lines changed

packages/core/src/actor/action.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,17 @@ export class ActionContext<
156156
}
157157

158158
/**
159-
* Runs a promise in the background.
159+
* Prevents the actor from sleeping until promise is complete.
160160
*/
161-
runInBackground(promise: Promise<void>): void {
162-
this.#actorContext.runInBackground(promise);
161+
waitUntil(promise: Promise<void>): void {
162+
this.#actorContext.waitUntil(promise);
163+
}
164+
165+
/**
166+
* AbortSignal that fires when the actor is stopping.
167+
*/
168+
get abortSignal(): AbortSignal {
169+
return this.#actorContext.abortSignal;
163170
}
164171

165172
/**

packages/core/src/actor/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ export const ActorConfigSchema = z
6868
onStopTimeout: z.number().positive().default(5000),
6969
stateSaveInterval: z.number().positive().default(10_000),
7070
actionTimeout: z.number().positive().default(60_000),
71+
// Max time to wait for waitUntil background promises during shutdown
72+
waitUntilTimeout: z.number().positive().default(15_000),
7173
connectionLivenessTimeout: z.number().positive().default(2500),
7274
connectionLivenessInterval: z.number().positive().default(5000),
7375
noSleep: z.boolean().default(false),

packages/core/src/actor/context.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,17 @@ export class ActorContext<
145145
}
146146

147147
/**
148-
* Runs a promise in the background.
149-
*
150-
* @param promise - The promise to run in the background.
148+
* Prevents the actor from sleeping until promise is complete.
149+
*/
150+
waitUntil(promise: Promise<void>): void {
151+
this.#actor._waitUntil(promise);
152+
}
153+
154+
/**
155+
* AbortSignal that fires when the actor is stopping.
151156
*/
152-
runInBackground(promise: Promise<void>): void {
153-
this.#actor._runInBackground(promise);
157+
get abortSignal(): AbortSignal {
158+
return this.#actor.abortSignal;
154159
}
155160

156161
/**

packages/core/src/actor/instance.ts

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ export class ActorInstance<
158158
#vars?: V;
159159

160160
#backgroundPromises: Promise<void>[] = [];
161+
#abortController = new AbortController();
161162
#config: ActorConfig<S, CP, CS, V, I, AD, DB>;
162163
#connectionDrivers!: ConnectionDriversMap;
163164
#actorDriver!: ActorDriver;
@@ -1050,6 +1051,9 @@ export class ActorInstance<
10501051

10511052
#assertReady() {
10521053
if (!this.#ready) throw new errors.InternalError("Actor not ready");
1054+
if (this.#sleepCalled)
1055+
throw new errors.InternalError("Actor is going to sleep");
1056+
if (this.#stopCalled) throw new errors.InternalError("Actor is stopping");
10531057
}
10541058

10551059
/**
@@ -1443,24 +1447,24 @@ export class ActorInstance<
14431447
}
14441448

14451449
/**
1446-
* Runs a promise in the background.
1450+
* Prevents the actor from sleeping until promise is complete.
14471451
*
14481452
* This allows the actor runtime to ensure that a promise completes while
14491453
* returning from an action request early.
14501454
*
14511455
* @param promise - The promise to run in the background.
14521456
*/
1453-
_runInBackground(promise: Promise<void>) {
1457+
_waitUntil(promise: Promise<void>) {
14541458
this.#assertReady();
14551459

14561460
// TODO: Should we force save the state?
14571461
// Add logging to promise and make it non-failable
14581462
const nonfailablePromise = promise
14591463
.then(() => {
1460-
logger().debug("background promise complete");
1464+
logger().debug("wait until promise complete");
14611465
})
14621466
.catch((error) => {
1463-
logger().error("background promise failed", {
1467+
logger().error("wait until promise failed", {
14641468
error: stringifyError(error),
14651469
});
14661470
});
@@ -1552,7 +1556,7 @@ export class ActorInstance<
15521556
return true;
15531557
}
15541558

1555-
/** Puts an actor to sleep. */
1559+
/** Puts an actor to sleep. This should just start the sleep sequence, most shutdown logic should be in _stop (which is called by the ActorDriver when sleeping). */
15561560
async _sleep() {
15571561
invariant(this.#sleepingSupported, "sleeping not supported");
15581562
invariant(this.#actorDriver.sleep, "no sleep on driver");
@@ -1581,6 +1585,11 @@ export class ActorInstance<
15811585

15821586
logger().info("actor stopping");
15831587

1588+
// Abort any listeners waiting for shutdown
1589+
try {
1590+
this.#abortController.abort();
1591+
} catch {}
1592+
15841593
// Call onStop lifecycle hook if defined
15851594
if (this.#config.onStop) {
15861595
try {
@@ -1601,6 +1610,9 @@ export class ActorInstance<
16011610
}
16021611
}
16031612

1613+
// Wait for any background tasks to finish, with timeout
1614+
await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout);
1615+
16041616
// Write state
16051617
await this.saveState({ immediate: true });
16061618

@@ -1625,8 +1637,39 @@ export class ActorInstance<
16251637
"timed out waiting for connections to close, shutting down anyway",
16261638
);
16271639
}
1640+
}
1641+
1642+
/** Abort signal that fires when the actor is stopping. */
1643+
get abortSignal(): AbortSignal {
1644+
return this.#abortController.signal;
1645+
}
1646+
1647+
/** Wait for background waitUntil promises with a timeout. */
1648+
async #waitBackgroundPromises(timeoutMs: number) {
1649+
const pending = this.#backgroundPromises;
1650+
if (pending.length === 0) {
1651+
logger().debug("no background promises");
1652+
return;
1653+
}
1654+
1655+
// Race promises with timeout to determine if pending promises settled fast enough
1656+
const timedOut = await Promise.race([
1657+
Promise.allSettled(pending).then(() => false),
1658+
new Promise<true>((resolve) =>
1659+
setTimeout(() => resolve(true), timeoutMs),
1660+
),
1661+
]);
16281662

1629-
// TODO:
1630-
//Deno.exit(0);
1663+
if (timedOut) {
1664+
logger().error(
1665+
"timed out waiting for background tasks, background promises may have leaked",
1666+
{
1667+
count: pending.length,
1668+
timeoutMs,
1669+
},
1670+
);
1671+
} else {
1672+
logger().debug("background promises finished");
1673+
}
16311674
}
16321675
}

0 commit comments

Comments
 (0)