Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions samples/bun-worker/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import run from "@absurd-sqlite/bun-worker";
import run, { Temporal } from "@absurd-sqlite/bun-worker";
import { Database } from "bun:sqlite";
import { existsSync, readdirSync } from "node:fs";
import { join } from "node:path";
Expand All @@ -19,7 +19,7 @@ await run(async (absurd) => {
return {};
});

await ctx.sleepFor("back off 15s", 15);
await ctx.sleepFor("back off 15s", Temporal.Duration.from({ seconds: 15 }));

await ctx.step("process", async () => {
console.log("process step");
Expand Down
5 changes: 4 additions & 1 deletion samples/typescript-client/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions samples/typescript-client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Absurd, SQLiteConnection, SQLiteDatabase } from "@absurd-sqlite/sdk";
import { Absurd, SQLiteConnection, SQLiteDatabase, Temporal } from "@absurd-sqlite/sdk";
import sqlite from "better-sqlite3";

async function main() {
Expand Down Expand Up @@ -27,7 +27,7 @@ async function main() {
return {};
});

await ctx.sleepFor("back off 15s", 15);
await ctx.sleepFor("back off 15s", Temporal.Duration.from({ seconds: 15 }));

await ctx.step("process", async () => {
console.log("process step");
Expand Down
7 changes: 6 additions & 1 deletion sdks/bun-worker/bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions sdks/bun-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@absurd-sqlite/bun-worker",
"version": "0.3.0-alpha.0",
"version": "0.3.0-alpha.1",
"description": "Bun worker utilities for Absurd-SQLite",
"type": "module",
"main": "dist/index.js",
Expand Down Expand Up @@ -39,7 +39,8 @@
"homepage": "https://github.com/b4fun/absurd-sqlite#readme",
"dependencies": {
"@absurd-sqlite/sdk": "next",
"cac": "^6.7.14"
"cac": "^6.7.14",
"temporal-polyfill": "^0.3.0"
},
"devDependencies": {
"bun-types": "^1.3.6",
Expand Down
1 change: 1 addition & 0 deletions sdks/bun-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export type { WorkerOptions } from "@absurd-sqlite/sdk";
export {
downloadExtension,
type DownloadExtensionOptions,
Temporal,
} from "@absurd-sqlite/sdk";

/**
Expand Down
40 changes: 30 additions & 10 deletions sdks/bun-worker/src/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type {
SQLiteStatement,
SQLiteValueCodec,
} from "@absurd-sqlite/sdk";
import { SQLiteConnection } from "@absurd-sqlite/sdk";
import { SQLiteConnection, Temporal } from "@absurd-sqlite/sdk";

export class BunSqliteConnection extends SQLiteConnection {
constructor(db: Database, options: SQLiteConnectionOptions = {}) {
Expand Down Expand Up @@ -115,6 +115,19 @@ function decodeRowValues<R extends object = any>(args: {
}) => unknown;
}): R {
const decodedRow: any = {};
if (args.columns && args.decodeColumn) {
for (const column of args.columns) {
const columnName = column.name;
const rawValue = args.row[columnName];
decodedRow[columnName] = args.decodeColumn({
value: rawValue,
columnName,
columnType: column.type,
});
}
return decodedRow as R;
}

for (const [columnName, rawValue] of Object.entries(args.row)) {
decodedRow[columnName] = decodeColumnValue({
value: rawValue,
Expand All @@ -132,21 +145,22 @@ function decodeColumnValue<V = any>(args: {
columnType: string | null;
verbose?: (...args: any[]) => void;
}): V | null {
const { value, columnName } = args;
const { value, columnName, columnType } = args;
if (value === null || value === undefined) {
return null;
}

if (isTimestampColumn(columnName)) {
if (typeof value === "number") {
return new Date(value) as V;
}
const isDateTime = columnType === "datetime" || isTimestampColumn(columnName);
if (isDateTime) {
if (typeof value === "string") {
const parsed = Date.parse(value);
if (!Number.isNaN(parsed)) {
return new Date(parsed) as V;
}
return Temporal.Instant.from(value) as V;
}
if (typeof value === "number") {
return Temporal.Instant.fromEpochMilliseconds(value) as V;
}
throw new Error(
`Expected datetime column ${columnName} to be a string or number, got ${typeof value}`
);
}

if (typeof value === "string") {
Expand All @@ -171,6 +185,12 @@ function tryDecodeJson<V = any>(value: string): V | null {
}

function encodeColumnValue(value: SQLiteBindValue): SQLiteBindValue {
if (value instanceof Temporal.Instant) {
return value.toString();
}
if (value instanceof Temporal.Duration) {
return value.toString();
}
if (value instanceof Date) {
return value.toISOString();
}
Expand Down
18 changes: 12 additions & 6 deletions sdks/bun-worker/test/basic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
jest,
} from "bun:test";
import assert from "node:assert/strict";
import type { Absurd } from "@absurd-sqlite/sdk";
import { Temporal, type Absurd } from "@absurd-sqlite/sdk";
import { createTestAbsurd, randomName, type TestContext } from "./setup";
import { EventEmitter, once } from "events";
import { waitFor } from "./wait-for";
Expand Down Expand Up @@ -171,7 +171,7 @@ describe("Basic SDK Operations", () => {
const scheduledRun = await ctx.getRun(runID);
expect(scheduledRun).toMatchObject({
state: "sleeping",
available_at: wakeAt,
available_at: Temporal.Instant.fromEpochMilliseconds(wakeAt.getTime()),
wake_event: null,
});

Expand All @@ -189,7 +189,7 @@ describe("Basic SDK Operations", () => {
const resumedRun = await ctx.getRun(runID);
expect(resumedRun).toMatchObject({
state: "running",
started_at: wakeAt,
started_at: Temporal.Instant.fromEpochMilliseconds(wakeAt.getTime()),
});
});

Expand All @@ -216,7 +216,9 @@ describe("Basic SDK Operations", () => {
expect(running).toMatchObject({
state: "running",
claimed_by: "worker-a",
claim_expires_at: new Date(baseTime.getTime() + 30 * 1000),
claim_expires_at: Temporal.Instant.fromEpochMilliseconds(
baseTime.getTime() + 30 * 1000,
),
});

await ctx.setFakeNow(new Date(baseTime.getTime() + 5 * 60 * 1000));
Expand Down Expand Up @@ -275,7 +277,9 @@ describe("Basic SDK Operations", () => {
const runRow = await ctx.getRun(runID);
expect(runRow).toMatchObject({
claimed_by: "worker-clean",
claim_expires_at: new Date(base.getTime() + 60 * 1000),
claim_expires_at: Temporal.Instant.fromEpochMilliseconds(
base.getTime() + 60 * 1000,
),
});

const beforeTTL = new Date(finishTime.getTime() + 30 * 60 * 1000);
Expand Down Expand Up @@ -482,7 +486,9 @@ describe("Basic SDK Operations", () => {

const getExpiresAt = async (runID: string) => {
const run = await ctx.getRun(runID);
return run?.claim_expires_at ? run.claim_expires_at.getTime() : 0;
return run?.claim_expires_at
? run.claim_expires_at.epochMilliseconds
: 0;
};

absurd.workBatch("test-worker", claimTimeout);
Expand Down
19 changes: 13 additions & 6 deletions sdks/bun-worker/test/events.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, test, expect, beforeAll, afterEach } from "bun:test";
import type { Absurd } from "@absurd-sqlite/sdk";
import { Temporal, type Absurd } from "@absurd-sqlite/sdk";
import { createTestAbsurd, randomName, type TestContext } from "./setup";
import { TimeoutError } from "@absurd-sqlite/sdk";

Expand All @@ -21,7 +21,9 @@ describe("Event system", () => {
const eventName = randomName("test_event");

absurd.registerTask({ name: "waiter" }, async (params, ctx) => {
const payload = await ctx.awaitEvent(eventName, { timeout: 60 });
const payload = await ctx.awaitEvent(eventName, {
timeout: Temporal.Duration.from({ seconds: 60 }),
});
return { received: payload };
});

Expand Down Expand Up @@ -86,7 +88,7 @@ describe("Event system", () => {
absurd.registerTask({ name: "timeout-waiter" }, async (_params, ctx) => {
try {
const payload = await ctx.awaitEvent(eventName, {
timeout: timeoutSeconds,
timeout: Temporal.Duration.from({ seconds: timeoutSeconds }),
});
return { timedOut: false, result: payload };
} catch (err) {
Expand All @@ -109,7 +111,9 @@ describe("Event system", () => {
wake_event: eventName,
});
const expectedWake = new Date(baseTime.getTime() + timeoutSeconds * 1000);
expect(sleepingRun?.available_at?.getTime()).toBe(expectedWake.getTime());
expect(sleepingRun?.available_at?.epochMilliseconds).toBe(
expectedWake.getTime(),
);

await ctx.setFakeNow(new Date(expectedWake.getTime() + 1000));
await absurd.workBatch("worker1", 120, 1);
Expand Down Expand Up @@ -170,13 +174,16 @@ describe("Event system", () => {

absurd.registerTask({ name: "timeout-no-loop" }, async (_params, ctx) => {
try {
await ctx.awaitEvent(eventName, { stepName: "wait", timeout: 10 });
await ctx.awaitEvent(eventName, {
stepName: "wait",
timeout: Temporal.Duration.from({ seconds: 10 }),
});
return { stage: "unexpected" };
} catch (err) {
if (err instanceof TimeoutError) {
const payload = await ctx.awaitEvent(eventName, {
stepName: "wait",
timeout: 10,
timeout: Temporal.Duration.from({ seconds: 10 }),
});
return { stage: "resumed", payload };
}
Expand Down
10 changes: 5 additions & 5 deletions sdks/bun-worker/test/retry.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, test, expect, beforeAll, afterEach } from "bun:test";
import type { Absurd } from "@absurd-sqlite/sdk";
import { Temporal, type Absurd } from "@absurd-sqlite/sdk";
import { createTestAbsurd, randomName, type TestContext } from "./setup";

describe("Retry and cancellation", () => {
Expand Down Expand Up @@ -159,7 +159,7 @@ describe("Retry and cancellation", () => {
const { taskID } = await absurd.spawn("duration-cancel", undefined, {
maxAttempts: 4,
retryStrategy: { kind: "fixed", baseSeconds: 30 },
cancellation: { maxDuration: 90 },
cancellation: { maxDuration: Temporal.Duration.from({ seconds: 90 }) },
});

await absurd.workBatch("worker1", 60, 1);
Expand All @@ -185,7 +185,7 @@ describe("Retry and cancellation", () => {
});

const { taskID } = await absurd.spawn("delay-cancel", undefined, {
cancellation: { maxDelay: 60 },
cancellation: { maxDelay: Temporal.Duration.from({ seconds: 60 }) },
});

await ctx.setFakeNow(new Date(baseTime.getTime() + 61 * 1000));
Expand Down Expand Up @@ -312,8 +312,8 @@ describe("Retry and cancellation", () => {

await absurd.cancelTask(taskID);
const second = await ctx.getTask(taskID);
expect(second?.cancelled_at?.getTime()).toBe(
first?.cancelled_at?.getTime(),
expect(second?.cancelled_at?.epochMilliseconds).toBe(
first?.cancelled_at?.epochMilliseconds,
);
});

Expand Down
19 changes: 10 additions & 9 deletions sdks/bun-worker/test/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { join } from "node:path";
import { fileURLToPath } from "node:url";
import {
Absurd,
Temporal,
type AbsurdHooks,
type JsonValue,
} from "@absurd-sqlite/sdk";
Expand All @@ -23,8 +24,8 @@ export interface TaskRow {
retry_strategy: JsonValue | null;
max_attempts: number | null;
cancellation: JsonValue | null;
enqueue_at: Date;
first_started_at: Date | null;
enqueue_at: Temporal.Instant;
first_started_at: Temporal.Instant | null;
state:
| "pending"
| "running"
Expand All @@ -35,7 +36,7 @@ export interface TaskRow {
attempts: number;
last_attempt_run: string | null;
completed_payload: JsonValue | null;
cancelled_at: Date | null;
cancelled_at: Temporal.Instant | null;
}

export interface RunRow {
Expand All @@ -50,16 +51,16 @@ export interface RunRow {
| "failed"
| "cancelled";
claimed_by: string | null;
claim_expires_at: Date | null;
available_at: Date;
claim_expires_at: Temporal.Instant | null;
available_at: Temporal.Instant;
wake_event: string | null;
event_payload: JsonValue | null;
started_at: Date | null;
completed_at: Date | null;
failed_at: Date | null;
started_at: Temporal.Instant | null;
completed_at: Temporal.Instant | null;
failed_at: Temporal.Instant | null;
result: JsonValue | null;
failure_reason: JsonValue | null;
created_at: Date;
created_at: Temporal.Instant;
}

interface SqliteFixture {
Expand Down
Loading