From 5322434f9cebebbfbd2739b61d8c5587ee61c2e5 Mon Sep 17 00:00:00 2001 From: Benjamin Shafii Date: Mon, 1 Jun 2026 23:02:55 -0700 Subject: [PATCH 1/3] feat(den): seed providers in Daytona workers --- ee/apps/den-api/package.json | 1 + .../scripts/daytona-provider-seed-smoke.ts | 133 ++++++++++++++++++ ee/apps/den-api/src/routes/workers/core.ts | 13 +- ee/apps/den-api/src/routes/workers/shared.ts | 12 +- .../workers/daytona-provider-seed-loader.ts | 40 ++++++ .../src/workers/daytona-provider-seed.ts | 95 +++++++++++++ ee/apps/den-api/src/workers/daytona.ts | 29 +++- ee/apps/den-api/src/workers/provisioner.ts | 4 + .../test/daytona-provider-seed.test.ts | 75 ++++++++++ 9 files changed, 391 insertions(+), 11 deletions(-) create mode 100644 ee/apps/den-api/scripts/daytona-provider-seed-smoke.ts create mode 100644 ee/apps/den-api/src/workers/daytona-provider-seed-loader.ts create mode 100644 ee/apps/den-api/src/workers/daytona-provider-seed.ts create mode 100644 ee/apps/den-api/test/daytona-provider-seed.test.ts diff --git a/ee/apps/den-api/package.json b/ee/apps/den-api/package.json index 830ec53628..752aa6e542 100644 --- a/ee/apps/den-api/package.json +++ b/ee/apps/den-api/package.json @@ -10,6 +10,7 @@ "build:den-db": "pnpm --filter @openwork-ee/den-db build", "backfill:desktop-policies": "pnpm run build:den-db && tsx scripts/backfill-desktop-policies.ts", "seed:demo-org": "pnpm run build:den-db && sh -lc 'DEN_WEB_PORT=${DEN_WEB_PORT:-3005}; OPENWORK_DEV_MODE=${OPENWORK_DEV_MODE:-1} DATABASE_URL=${DATABASE_URL:-mysql://root:password@127.0.0.1:3306/openwork_den} DEN_DB_ENCRYPTION_KEY=${DEN_DB_ENCRYPTION_KEY:-local-dev-db-encryption-key-please-change-1234567890} BETTER_AUTH_SECRET=${BETTER_AUTH_SECRET:-local-dev-secret-not-for-production-use!!} BETTER_AUTH_URL=${BETTER_AUTH_URL:-http://localhost:$DEN_WEB_PORT} tsx scripts/seed-demo-org.ts'", + "smoke:daytona-provider-seed": "tsx scripts/daytona-provider-seed-smoke.ts", "start": "node dist/server.js" }, "dependencies": { diff --git a/ee/apps/den-api/scripts/daytona-provider-seed-smoke.ts b/ee/apps/den-api/scripts/daytona-provider-seed-smoke.ts new file mode 100644 index 0000000000..82edce6a28 --- /dev/null +++ b/ee/apps/den-api/scripts/daytona-provider-seed-smoke.ts @@ -0,0 +1,133 @@ +import { Daytona } from "@daytonaio/sdk" +import { + buildDaytonaProviderSeed, + buildDaytonaProviderSeedScript, + buildShellEnvAssignments, + shellQuote, +} from "../src/workers/daytona-provider-seed.js" + +function requiredEnv(name: string) { + const value = process.env[name]?.trim() + if (!value) { + throw new Error(`${name} is required`) + } + return value +} + +function optionalEnv(name: string, fallback: string) { + const value = process.env[name]?.trim() + return value ? value : fallback +} + +function slug(value: string) { + return value + .toLowerCase() + .replace(/[^a-z0-9-]+/g, "-") + .replace(/-+/g, "-") + .replace(/^-|-$/g, "") +} + +async function main() { + const apiKey = requiredEnv("DAYTONA_API_KEY") + const apiUrl = optionalEnv("DAYTONA_API_URL", "https://app.daytona.io/api") + const target = process.env.DAYTONA_TARGET?.trim() + const snapshot = process.env.DAYTONA_SNAPSHOT?.trim() + const image = optionalEnv("DAYTONA_SANDBOX_IMAGE", "node:22-bookworm-slim") + const createTimeoutSeconds = Number(process.env.DAYTONA_CREATE_TIMEOUT_SECONDS ?? "300") + const deleteTimeoutSeconds = Number(process.env.DAYTONA_DELETE_TIMEOUT_SECONDS ?? "120") + const commandTimeoutSeconds = Number(process.env.DAYTONA_SMOKE_COMMAND_TIMEOUT_SECONDS ?? "120") + const name = slug(`den-provider-seed-smoke-${Date.now().toString(36)}`).slice(0, 63) + + const daytona = new Daytona({ + apiKey, + apiUrl, + ...(target ? { target } : {}), + }) + + const common = { + name, + public: false, + autoStopInterval: 0, + autoArchiveInterval: 0, + autoDeleteInterval: 0, + ephemeral: true, + labels: { + "openwork.den.provider": "daytona", + "openwork.den.type": "provider-seed-smoke", + }, + envVars: { + DEN_RUNTIME_PROVIDER: "daytona-provider-seed-smoke", + }, + resources: { + cpu: 1, + memory: 1, + disk: 4, + }, + } + + const sandbox = await daytona.create( + snapshot ? { ...common, snapshot } : { ...common, image }, + { timeout: createTimeoutSeconds }, + ) + + try { + const smokeKey = "ow_inf_smoke_key" + const seed = buildDaytonaProviderSeed([ + { + providerId: "openwork", + providerConfig: { + id: "openwork", + name: "OpenWork", + npm: "@openrouter/ai-sdk-provider", + env: ["OPENWORK_API_KEY"], + api: "https://inference.openwork.test/api/v1", + options: { + baseURL: "https://inference.openwork.test/api/v1", + }, + }, + apiKey: smokeKey, + }, + ]) + + const configPath = "/tmp/openwork-daytona-provider-seed/opencode.jsonc" + const validateConfigScript = [ + 'const fs = require("node:fs")', + 'const config = JSON.parse(fs.readFileSync(process.argv[1], "utf8"))', + 'if (!config.provider || !config.provider.openwork) throw new Error("openwork_provider_missing")', + `if (JSON.stringify(config).includes(${JSON.stringify(smokeKey)})) throw new Error("api_key_written_to_config")`, + ].join("; ") + const validateEnvScript = `if (process.env.OPENWORK_API_KEY !== ${JSON.stringify(smokeKey)}) throw new Error("openwork_api_key_env_missing")` + const commandScript = [ + "set -eu", + buildDaytonaProviderSeedScript({ configPath, seed }), + `node -e ${shellQuote(validateConfigScript)} ${shellQuote(configPath)}`, + `${buildShellEnvAssignments(seed?.env ?? {})} node -e ${shellQuote(validateEnvScript)}`, + "command -v opencode >/dev/null 2>&1 || { echo 'opencode missing; set DAYTONA_SNAPSHOT to the OpenWork runtime snapshot' >&2; exit 2; }", + "opencode --version", + ].join("\n") + + const result = await sandbox.process.executeCommand( + `sh -lc ${shellQuote(commandScript)}`, + undefined, + undefined, + commandTimeoutSeconds, + ) + + if (result.exitCode !== 0) { + throw new Error(result.result?.trim() || `smoke command exited with ${result.exitCode}`) + } + + console.log(JSON.stringify({ ok: true, sandboxId: sandbox.id, output: result.result?.trim() ?? "" }, null, 2)) + } finally { + await sandbox.delete(deleteTimeoutSeconds).catch((error) => { + const message = error instanceof Error ? error.message : "unknown_error" + console.warn(`[daytona-smoke] failed to delete sandbox ${sandbox.id}: ${message}`) + }) + } +} + +main().catch((error) => { + const message = error instanceof Error ? error.message : "unknown_error" + console.error(message) + process.exit(1) +}) diff --git a/ee/apps/den-api/src/routes/workers/core.ts b/ee/apps/den-api/src/routes/workers/core.ts index 2d0be1fe80..b86bdf041d 100644 --- a/ee/apps/den-api/src/routes/workers/core.ts +++ b/ee/apps/den-api/src/routes/workers/core.ts @@ -5,9 +5,10 @@ import type { Hono } from "hono" import { describeRoute } from "hono-openapi" import { z } from "zod" import { db } from "../../db.js" -import { jsonValidator, paramValidator, queryValidator, requireUserMiddleware, resolveUserOrganizationsMiddleware } from "../../middleware/index.js" +import { jsonValidator, paramValidator, queryValidator, requireUserMiddleware, resolveOrganizationContextMiddleware, resolveUserOrganizationsMiddleware } from "../../middleware/index.js" import { denTypeIdSchema, emptyResponse, forbiddenSchema, invalidRequestSchema, jsonResponse, notFoundSchema, unauthorizedSchema } from "../../openapi.js" import { getOrganizationLimitStatus } from "../../organization-limits.js" +import type { OrganizationContext } from "../../orgs.js" import { getRequiredUserEmail } from "../../user.js" import type { WorkerRouteVariables } from "./shared.js" import { @@ -186,11 +187,12 @@ export function registerWorkerCoreRoutes { const user = c.get("user") const orgId = c.get("activeOrganizationId") + const organizationContext: OrganizationContext = c.get("organizationContext") const input = c.req.valid("json") if (!orgId) { @@ -273,12 +275,19 @@ export function registerWorkerCoreRoutes team.memberIds.includes(organizationContext.currentMember.id)) + .map((team) => team.id) + void continueCloudProvisioning({ workerId, name: input.name, hostToken, clientToken, activityToken, + organizationId: organizationContext.organization.id, + memberId: organizationContext.currentMember.id, + memberTeamIds, }) } diff --git a/ee/apps/den-api/src/routes/workers/shared.ts b/ee/apps/den-api/src/routes/workers/shared.ts index cbf1f01d25..58228ce281 100644 --- a/ee/apps/den-api/src/routes/workers/shared.ts +++ b/ee/apps/den-api/src/routes/workers/shared.ts @@ -15,10 +15,10 @@ import { z } from "zod" import { requireCloudWorkerAccess } from "../../billing/polar.js" import { db } from "../../db.js" import { env } from "../../env.js" -import type { UserOrganizationsContext } from "../../middleware/index.js" +import type { OrganizationContextVariables, UserOrganizationsContext } from "../../middleware/index.js" import { denTypeIdSchema } from "../../openapi.js" import type { AuthContextVariables } from "../../session.js" -import { deprovisionWorker, provisionWorker } from "../../workers/provisioner.js" +import { deprovisionWorker, provisionWorker, type ProvisionInput } from "../../workers/provisioner.js" import { customDomainForWorker } from "../../workers/vanity-domain.js" export const createWorkerSchema = z.object({ @@ -49,7 +49,7 @@ export const workerIdParamSchema = z.object({ id: denTypeIdSchema("worker"), }) -export type WorkerRouteVariables = AuthContextVariables & Partial +export type WorkerRouteVariables = AuthContextVariables & Partial & Partial type WorkerRow = typeof WorkerTable.$inferSelect type WorkerInstanceRow = typeof WorkerInstanceTable.$inferSelect @@ -325,6 +325,9 @@ export async function continueCloudProvisioning(input: { hostToken: string clientToken: string activityToken: string + organizationId?: ProvisionInput["organizationId"] + memberId?: ProvisionInput["memberId"] + memberTeamIds?: ProvisionInput["memberTeamIds"] }) { try { const provisioned = await provisionWorker({ @@ -333,6 +336,9 @@ export async function continueCloudProvisioning(input: { hostToken: input.hostToken, clientToken: input.clientToken, activityToken: input.activityToken, + organizationId: input.organizationId, + memberId: input.memberId, + memberTeamIds: input.memberTeamIds, }) await db diff --git a/ee/apps/den-api/src/workers/daytona-provider-seed-loader.ts b/ee/apps/den-api/src/workers/daytona-provider-seed-loader.ts new file mode 100644 index 0000000000..3a77d47586 --- /dev/null +++ b/ee/apps/den-api/src/workers/daytona-provider-seed-loader.ts @@ -0,0 +1,40 @@ +import { and, eq, inArray, or } from "@openwork-ee/den-db/drizzle" +import { LlmProviderAccessTable, LlmProviderTable } from "@openwork-ee/den-db/schema" +import { db } from "../db.js" +import { buildDaytonaProviderSeed } from "./daytona-provider-seed.js" + +type ProviderSeedMembership = { + organizationId: typeof LlmProviderTable.$inferSelect.organizationId + memberId: NonNullable + teamIds: Array> +} + +export type DaytonaProviderSeedMembership = ProviderSeedMembership + +export async function loadMemberDaytonaProviderSeed(input: ProviderSeedMembership) { + const teamIds = [...new Set(input.teamIds)] + const accessWhere = teamIds.length > 0 + ? and( + eq(LlmProviderTable.organizationId, input.organizationId), + or( + eq(LlmProviderAccessTable.orgMembershipId, input.memberId), + inArray(LlmProviderAccessTable.teamId, teamIds), + ), + ) + : and( + eq(LlmProviderTable.organizationId, input.organizationId), + eq(LlmProviderAccessTable.orgMembershipId, input.memberId), + ) + + const rows = await db + .select({ + providerId: LlmProviderTable.providerId, + providerConfig: LlmProviderTable.providerConfig, + apiKey: LlmProviderTable.apiKey, + }) + .from(LlmProviderAccessTable) + .innerJoin(LlmProviderTable, eq(LlmProviderAccessTable.llmProviderId, LlmProviderTable.id)) + .where(accessWhere) + + return buildDaytonaProviderSeed(rows) +} diff --git a/ee/apps/den-api/src/workers/daytona-provider-seed.ts b/ee/apps/den-api/src/workers/daytona-provider-seed.ts new file mode 100644 index 0000000000..f3f8d5d27e --- /dev/null +++ b/ee/apps/den-api/src/workers/daytona-provider-seed.ts @@ -0,0 +1,95 @@ +export type DaytonaLlmProviderSeedSource = { + providerId: string + providerConfig: Record + apiKey: string | null +} + +export type DaytonaProviderSeed = { + provider: Record> + env: Record +} + +const shellEnvNamePattern = /^[A-Za-z_][A-Za-z0-9_]*$/ + +export function shellQuote(value: string) { + return `'${value.replace(/'/g, `'"'"'`)}'` +} + +function providerEnvNames(providerConfig: Record) { + const env = providerConfig.env + if (!Array.isArray(env)) { + return [] + } + + return env.filter((value): value is string => ( + typeof value === "string" && shellEnvNamePattern.test(value) + )) +} + +export function buildDaytonaProviderSeed(providers: DaytonaLlmProviderSeedSource[]) { + const seed: DaytonaProviderSeed = { + provider: {}, + env: {}, + } + + for (const provider of providers) { + const providerId = provider.providerId.trim() + if (!providerId) { + continue + } + + seed.provider[providerId] = provider.providerConfig + + if (!provider.apiKey) { + continue + } + + for (const envName of providerEnvNames(provider.providerConfig)) { + seed.env[envName] = provider.apiKey + } + } + + if (Object.keys(seed.provider).length === 0) { + return null + } + + return seed +} + +export function buildShellEnvAssignments(env: Record) { + return Object.entries(env) + .filter(([key]) => shellEnvNamePattern.test(key)) + .sort(([left], [right]) => left.localeCompare(right)) + .map(([key, value]) => ` ${key}=${shellQuote(value)}`) + .join("") +} + +export function buildDaytonaProviderSeedScript(input: { + configPath: string + seed: DaytonaProviderSeed | null +}) { + if (!input.seed) { + return "" + } + + const configPayload = Buffer.from(JSON.stringify({ provider: input.seed.provider })).toString("base64") + const script = [ + 'const fs = require("node:fs")', + 'const path = require("node:path")', + 'const target = process.argv[1]', + 'const raw = Buffer.from(process.env.OPENWORK_DAYTONA_PROVIDER_CONFIG_B64 || "", "base64").toString("utf8")', + 'const seed = JSON.parse(raw)', + 'fs.mkdirSync(path.dirname(target), { recursive: true })', + 'let existing = {}', + 'if (fs.existsSync(target)) {', + ' try { existing = JSON.parse(fs.readFileSync(target, "utf8")) } catch {}', + '}', + 'const existingProvider = existing.provider && typeof existing.provider === "object" && !Array.isArray(existing.provider) ? existing.provider : {}', + 'const next = { ...existing, provider: { ...existingProvider, ...seed.provider } }', + 'fs.writeFileSync(target, JSON.stringify(next, null, 2) + "\\n")', + ].join("; ") + + return [ + `OPENWORK_DAYTONA_PROVIDER_CONFIG_B64=${shellQuote(configPayload)} node -e ${shellQuote(script)} ${shellQuote(input.configPath)}`, + ].join("\n") +} diff --git a/ee/apps/den-api/src/workers/daytona.ts b/ee/apps/den-api/src/workers/daytona.ts index 1b794cee29..e06f923dc9 100644 --- a/ee/apps/den-api/src/workers/daytona.ts +++ b/ee/apps/den-api/src/workers/daytona.ts @@ -4,6 +4,8 @@ import { DaytonaSandboxTable } from "@openwork-ee/den-db/schema" import { createDenTypeId } from "@openwork-ee/utils/typeid" import { db } from "../db.js" import { env } from "../env.js" +import { buildDaytonaProviderSeedScript, buildShellEnvAssignments, shellQuote, type DaytonaProviderSeed } from "./daytona-provider-seed.js" +import { loadMemberDaytonaProviderSeed, type DaytonaProviderSeedMembership } from "./daytona-provider-seed-loader.js" type WorkerId = typeof DaytonaSandboxTable.$inferSelect.worker_id @@ -13,6 +15,9 @@ type ProvisionInput = { hostToken: string clientToken: string activityToken: string + organizationId?: DaytonaProviderSeedMembership["organizationId"] + memberId?: DaytonaProviderSeedMembership["memberId"] + memberTeamIds?: DaytonaProviderSeedMembership["teamIds"] } type ProvisionedInstance = { @@ -38,10 +43,6 @@ const slug = (value: string) => .replace(/-+/g, "-") .replace(/^-|-$/g, "") -function shellQuote(value: string) { - return `'${value.replace(/'/g, `'"'"'`)}'` -} - function createDaytonaClient() { return new Daytona({ apiKey: env.daytona.apiKey, @@ -192,11 +193,12 @@ function sharedVolumeMounts(workerId: WorkerId, volumeId: string) { ] } -function buildOpenWorkStartCommand(input: ProvisionInput) { +function buildOpenWorkStartCommand(input: ProvisionInput, providerSeed: DaytonaProviderSeed | null) { const verifyRuntimeStep = [ "if ! command -v openwork >/dev/null 2>&1; then echo 'openwork binary missing from Daytona runtime image; rebuild and republish the Daytona snapshot' >&2; exit 1; fi", "if ! command -v opencode >/dev/null 2>&1; then echo 'opencode binary missing from Daytona runtime image; rebuild and republish the Daytona snapshot' >&2; exit 1; fi", ].join("; ") + const providerEnvAssignments = buildShellEnvAssignments(providerSeed?.env ?? {}) const openworkServe = [ "OPENWORK_DATA_DIR=", shellQuote(env.daytona.runtimeDataPath), @@ -216,6 +218,7 @@ function buildOpenWorkStartCommand(input: ProvisionInput) { shellQuote(workerActivityHeartbeatUrl(input.workerId)), " DEN_ACTIVITY_HEARTBEAT_TOKEN=", shellQuote(input.activityToken), + providerEnvAssignments, " openwork serve", ` --workspace ${shellQuote(env.daytona.runtimeWorkspacePath)}`, ` --remote-access`, @@ -237,6 +240,7 @@ set -u mkdir -p ${shellQuote(env.daytona.workspaceMountPath)} ${shellQuote(env.daytona.dataMountPath)} ${shellQuote(env.daytona.runtimeWorkspacePath)} ${shellQuote(env.daytona.runtimeDataPath)} ${shellQuote(env.daytona.sidecarDir)} ${shellQuote(`${env.daytona.runtimeWorkspacePath}/volumes`)} ln -sfn ${shellQuote(env.daytona.workspaceMountPath)} ${shellQuote(`${env.daytona.runtimeWorkspacePath}/volumes/workspace`) } ln -sfn ${shellQuote(env.daytona.dataMountPath)} ${shellQuote(`${env.daytona.runtimeWorkspacePath}/volumes/data`) } +${buildDaytonaProviderSeedScript({ configPath: `${env.daytona.runtimeWorkspacePath}/opencode.jsonc`, seed: providerSeed })} ${verifyRuntimeStep} attempt=0 while [ "$attempt" -lt 3 ]; do @@ -444,6 +448,18 @@ async function upsertDaytonaSandbox(input: { }) } +async function resolveProviderSeed(input: ProvisionInput) { + if (!input.organizationId || !input.memberId) { + return null + } + + return loadMemberDaytonaProviderSeed({ + organizationId: input.organizationId, + memberId: input.memberId, + teamIds: input.memberTeamIds ?? [], + }) +} + export async function getDaytonaSandboxRecord(workerId: WorkerId) { const rows = await db .select() @@ -515,6 +531,7 @@ export async function provisionWorkerOnDaytona( sharedVolumeNameValue, env.daytona.createTimeoutSeconds * 1000, ) + const providerSeed = await resolveProviderSeed(input) let sandbox: Awaited> | null = null try { @@ -564,7 +581,7 @@ export async function provisionWorkerOnDaytona( const command = await sandbox.process.executeSessionCommand( sessionId, { - command: buildOpenWorkStartCommand(input), + command: buildOpenWorkStartCommand(input, providerSeed), runAsync: true, }, 0, diff --git a/ee/apps/den-api/src/workers/provisioner.ts b/ee/apps/den-api/src/workers/provisioner.ts index dcf952a34e..a960172e3e 100644 --- a/ee/apps/den-api/src/workers/provisioner.ts +++ b/ee/apps/den-api/src/workers/provisioner.ts @@ -4,6 +4,7 @@ import { deprovisionWorkerOnDaytona, provisionWorkerOnDaytona, } from "./daytona.js" +import type { DaytonaProviderSeedMembership } from "./daytona-provider-seed-loader.js" import { customDomainForWorker, ensureVercelDnsRecord, @@ -17,6 +18,9 @@ export type ProvisionInput = { hostToken: string clientToken: string activityToken: string + organizationId?: DaytonaProviderSeedMembership["organizationId"] + memberId?: DaytonaProviderSeedMembership["memberId"] + memberTeamIds?: DaytonaProviderSeedMembership["teamIds"] } export type ProvisionedInstance = { diff --git a/ee/apps/den-api/test/daytona-provider-seed.test.ts b/ee/apps/den-api/test/daytona-provider-seed.test.ts new file mode 100644 index 0000000000..b8739ae52f --- /dev/null +++ b/ee/apps/den-api/test/daytona-provider-seed.test.ts @@ -0,0 +1,75 @@ +import { mkdtemp, readFile, writeFile } from "node:fs/promises" +import { tmpdir } from "node:os" +import { join } from "node:path" +import { describe, expect, test } from "bun:test" +import { + buildDaytonaProviderSeed, + buildDaytonaProviderSeedScript, + buildShellEnvAssignments, +} from "../src/workers/daytona-provider-seed.js" + +const openworkProviderConfig = { + id: "openwork", + name: "OpenWork", + npm: "@openrouter/ai-sdk-provider", + env: ["OPENWORK_API_KEY"], + api: "https://inference.openwork.test/api/v1", + options: { + baseURL: "https://inference.openwork.test/api/v1", + }, +} + +describe("Daytona provider seeding", () => { + test("builds opencode provider config and env vars from accessible LLM providers", () => { + const seed = buildDaytonaProviderSeed([ + { + providerId: "openwork", + providerConfig: openworkProviderConfig, + apiKey: "ow_inf_test_key", + }, + ]) + + expect(seed?.provider.openwork).toEqual(openworkProviderConfig) + expect(seed?.env.OPENWORK_API_KEY).toBe("ow_inf_test_key") + }) + + test("ignores unsafe env var names before rendering shell assignments", () => { + const seed = buildDaytonaProviderSeed([ + { + providerId: "custom", + providerConfig: { + env: ["SAFE_KEY", "BAD-NAME", "$(whoami)"], + }, + apiKey: "secret", + }, + ]) + + expect(buildShellEnvAssignments(seed?.env ?? {})).toBe(" SAFE_KEY='secret'") + }) + + test("writes provider config without embedding provider API keys", async () => { + const dir = await mkdtemp(join(tmpdir(), "openwork-daytona-provider-seed-")) + const configPath = join(dir, "opencode.jsonc") + await writeFile(configPath, JSON.stringify({ theme: "dark", provider: { existing: { env: ["EXISTING_KEY"] } } }, null, 2), "utf8") + + const seed = buildDaytonaProviderSeed([ + { + providerId: "openwork", + providerConfig: openworkProviderConfig, + apiKey: "ow_inf_should_not_be_written", + }, + ]) + const script = buildDaytonaProviderSeedScript({ configPath, seed }) + + expect(script).not.toContain("ow_inf_should_not_be_written") + + const proc = Bun.spawn(["sh", "-lc", script]) + expect(await proc.exited).toBe(0) + + const text = await readFile(configPath, "utf8") + expect(text).toContain('"theme": "dark"') + expect(text).toContain('"existing"') + expect(text).toContain('"openwork"') + expect(text).not.toContain("ow_inf_should_not_be_written") + }) +}) From fffb01c97cb5dd573bca14f547e90a5e8dbec32f Mon Sep 17 00:00:00 2001 From: Benjamin Shafii Date: Mon, 1 Jun 2026 23:20:14 -0700 Subject: [PATCH 2/3] feat(den): trigger cloud worker background jobs --- .../scripts/daytona-provider-seed-smoke.ts | 16 +- ee/apps/den-api/src/routes/workers/core.ts | 159 +++++++++++++++++- .../den-api/src/workers/background-jobs.ts | 134 +++++++++++++++ .../src/workers/daytona-provider-seed.ts | 44 ++++- ee/apps/den-api/src/workers/daytona.ts | 8 +- .../test/daytona-provider-seed.test.ts | 44 ++++- .../test/mcp-worker-background-jobs.test.ts | 16 ++ .../test/worker-background-jobs.test.ts | 84 +++++++++ 8 files changed, 491 insertions(+), 14 deletions(-) create mode 100644 ee/apps/den-api/src/workers/background-jobs.ts create mode 100644 ee/apps/den-api/test/mcp-worker-background-jobs.test.ts create mode 100644 ee/apps/den-api/test/worker-background-jobs.test.ts diff --git a/ee/apps/den-api/scripts/daytona-provider-seed-smoke.ts b/ee/apps/den-api/scripts/daytona-provider-seed-smoke.ts index 82edce6a28..00c71b728e 100644 --- a/ee/apps/den-api/scripts/daytona-provider-seed-smoke.ts +++ b/ee/apps/den-api/scripts/daytona-provider-seed-smoke.ts @@ -3,6 +3,8 @@ import { buildDaytonaProviderSeed, buildDaytonaProviderSeedScript, buildShellEnvAssignments, + daytonaProviderSeedConfigPath, + daytonaProviderSeedManifestPath, shellQuote, } from "../src/workers/daytona-provider-seed.js" @@ -89,18 +91,28 @@ async function main() { }, ]) - const configPath = "/tmp/openwork-daytona-provider-seed/opencode.jsonc" + const workspacePath = "/tmp/openwork-daytona-provider-seed" + const configPath = daytonaProviderSeedConfigPath(workspacePath) + const manifestPath = daytonaProviderSeedManifestPath(workspacePath) const validateConfigScript = [ 'const fs = require("node:fs")', 'const config = JSON.parse(fs.readFileSync(process.argv[1], "utf8"))', 'if (!config.provider || !config.provider.openwork) throw new Error("openwork_provider_missing")', `if (JSON.stringify(config).includes(${JSON.stringify(smokeKey)})) throw new Error("api_key_written_to_config")`, ].join("; ") + const validateManifestScript = [ + 'const fs = require("node:fs")', + 'const manifest = JSON.parse(fs.readFileSync(process.argv[1], "utf8"))', + 'if (!manifest.providerIds.includes("openwork")) throw new Error("openwork_manifest_missing")', + 'if (!manifest.envNames.includes("OPENWORK_API_KEY")) throw new Error("openwork_manifest_env_missing")', + `if (JSON.stringify(manifest).includes(${JSON.stringify(smokeKey)})) throw new Error("api_key_written_to_manifest")`, + ].join("; ") const validateEnvScript = `if (process.env.OPENWORK_API_KEY !== ${JSON.stringify(smokeKey)}) throw new Error("openwork_api_key_env_missing")` const commandScript = [ "set -eu", - buildDaytonaProviderSeedScript({ configPath, seed }), + buildDaytonaProviderSeedScript({ configPath, manifestPath, seed }), `node -e ${shellQuote(validateConfigScript)} ${shellQuote(configPath)}`, + `node -e ${shellQuote(validateManifestScript)} ${shellQuote(manifestPath)}`, `${buildShellEnvAssignments(seed?.env ?? {})} node -e ${shellQuote(validateEnvScript)}`, "command -v opencode >/dev/null 2>&1 || { echo 'opencode missing; set DAYTONA_SNAPSHOT to the OpenWork runtime snapshot' >&2; exit 2; }", "opencode --version", diff --git a/ee/apps/den-api/src/routes/workers/core.ts b/ee/apps/den-api/src/routes/workers/core.ts index b86bdf041d..4210def101 100644 --- a/ee/apps/den-api/src/routes/workers/core.ts +++ b/ee/apps/den-api/src/routes/workers/core.ts @@ -5,11 +5,15 @@ import type { Hono } from "hono" import { describeRoute } from "hono-openapi" import { z } from "zod" import { db } from "../../db.js" +import { env } from "../../env.js" import { jsonValidator, paramValidator, queryValidator, requireUserMiddleware, resolveOrganizationContextMiddleware, resolveUserOrganizationsMiddleware } from "../../middleware/index.js" import { denTypeIdSchema, emptyResponse, forbiddenSchema, invalidRequestSchema, jsonResponse, notFoundSchema, unauthorizedSchema } from "../../openapi.js" import { getOrganizationLimitStatus } from "../../organization-limits.js" import type { OrganizationContext } from "../../orgs.js" import { getRequiredUserEmail } from "../../user.js" +import { startWorkerBackgroundJob } from "../../workers/background-jobs.js" +import { buildDaytonaProviderSeedSummary, daytonaProviderSeedConfigPath, daytonaProviderSeedManifestPath } from "../../workers/daytona-provider-seed.js" +import { loadMemberDaytonaProviderSeed } from "../../workers/daytona-provider-seed-loader.js" import type { WorkerRouteVariables } from "./shared.js" import { continueCloudProvisioning, @@ -92,6 +96,35 @@ const workerTokensResponseSchema = z.object({ }).nullable(), }).meta({ ref: "WorkerTokensResponse" }) +const workerProviderSeedPreviewResponseSchema = z.object({ + providerCount: z.number().int(), + providerIds: z.array(z.string()), + envNames: z.array(z.string()), + configPath: z.string(), + manifestPath: z.string(), +}).meta({ ref: "WorkerProviderSeedPreviewResponse" }) + +const workerBackgroundJobCreateSchema = z.object({ + prompt: z.string().trim().min(1), + title: z.string().trim().min(1).max(255).optional(), + model: z.object({ + providerID: z.string().trim().min(1), + modelID: z.string().trim().min(1), + }).optional(), + agent: z.string().trim().min(1).optional(), + variant: z.string().trim().min(1).optional(), +}).meta({ ref: "WorkerBackgroundJobCreateRequest" }) + +const workerBackgroundJobResponseSchema = z.object({ + job: z.object({ + id: z.string(), + status: z.literal("accepted"), + workerId: denTypeIdSchema("worker"), + sessionId: z.string(), + openworkUrl: z.string(), + }), +}).meta({ ref: "WorkerBackgroundJobResponse" }) + const organizationUnavailableSchema = z.object({ error: z.literal("organization_unavailable"), }).meta({ ref: "OrganizationUnavailableError" }) @@ -125,6 +158,26 @@ const workerRuntimeUnavailableSchema = z.object({ message: z.string(), })).meta({ ref: "WorkerConnectionError" }) +function memberTeamIdsForContext(context: OrganizationContext) { + return context.teams + .filter((team) => team.memberIds.includes(context.currentMember.id)) + .map((team) => team.id) +} + +async function buildProviderSeedPreview(context: OrganizationContext) { + const seed = await loadMemberDaytonaProviderSeed({ + organizationId: context.organization.id, + memberId: context.currentMember.id, + teamIds: memberTeamIdsForContext(context), + }) + + return buildDaytonaProviderSeedSummary({ + configPath: daytonaProviderSeedConfigPath(env.daytona.runtimeWorkspacePath), + manifestPath: daytonaProviderSeedManifestPath(env.daytona.runtimeWorkspacePath), + seed, + }) +} + export function registerWorkerCoreRoutes(app: Hono) { app.get( "/v1/workers", @@ -275,10 +328,6 @@ export function registerWorkerCoreRoutes team.memberIds.includes(organizationContext.currentMember.id)) - .map((team) => team.id) - void continueCloudProvisioning({ workerId, name: input.name, @@ -287,7 +336,7 @@ export function registerWorkerCoreRoutes { + const organizationContext: OrganizationContext = c.get("organizationContext") + return c.json(await buildProviderSeedPreview(organizationContext)) + }, + ) + + app.post( + "/v1/workers/:id/background-jobs", + describeRoute({ + tags: ["Workers"], + summary: "Start cloud worker background job", + description: "Starts a background OpenCode session on a cloud worker and returns immediately with the worker session id. This route is intentionally MCP-visible so OpenWork Cloud MCP clients can trigger cloud work without opening the desktop UI.", + responses: { + 202: jsonResponse("Background job accepted by the worker.", workerBackgroundJobResponseSchema), + 400: jsonResponse("The background job request was invalid.", invalidRequestSchema), + 401: jsonResponse("The caller must be signed in to start worker background jobs.", unauthorizedSchema), + 404: jsonResponse("The worker could not be found.", notFoundSchema), + 409: jsonResponse("The worker runtime is not ready for background jobs.", workerRuntimeUnavailableSchema), + }, + }), + requireUserMiddleware, + resolveUserOrganizationsMiddleware, + paramValidator(workerIdParamSchema), + jsonValidator(workerBackgroundJobCreateSchema), + async (c) => { + const orgId = c.get("activeOrganizationId") + const params = c.req.valid("param") + const input = c.req.valid("json") + + if (!orgId) { + return c.json({ error: "worker_not_found" }, 404) + } + + let workerId + try { + workerId = parseWorkerIdParam(params.id) + } catch { + return c.json({ error: "worker_not_found" }, 404) + } + + const worker = await getWorkerByIdForOrg(workerId, orgId) + if (!worker || worker.destination !== "cloud") { + return c.json({ error: "worker_not_found" }, 404) + } + + const access = await getWorkerTokensAndConnect(worker) + if ("error" in access && access.error) { + return c.json(access.error.body, 409) + } + + if (!access.connect?.openworkUrl || !access.connect.workspaceId) { + return c.json({ + error: "worker_runtime_unavailable", + message: "Worker runtime access is not ready yet. Wait for provisioning to finish and try again.", + }, 409) + } + + let job + try { + job = await startWorkerBackgroundJob({ + openworkUrl: access.connect.openworkUrl, + clientToken: access.tokens.client, + prompt: input.prompt, + title: input.title, + model: input.model, + agent: input.agent, + variant: input.variant, + }) + } catch (error) { + return c.json({ + error: "worker_runtime_unavailable", + message: error instanceof Error ? error.message : "Worker did not accept the background job.", + }, 409) + } + + return c.json({ + job: { + id: job.jobId, + status: job.status, + workerId: worker.id, + sessionId: job.sessionId, + openworkUrl: job.openworkUrl, + }, + }, 202) + }, + ) + app.get( "/v1/workers/:id", describeRoute({ diff --git a/ee/apps/den-api/src/workers/background-jobs.ts b/ee/apps/den-api/src/workers/background-jobs.ts new file mode 100644 index 0000000000..baf3d2e9e7 --- /dev/null +++ b/ee/apps/den-api/src/workers/background-jobs.ts @@ -0,0 +1,134 @@ +import { randomBytes } from "node:crypto" + +export type WorkerBackgroundJobInput = { + openworkUrl: string + clientToken: string + prompt: string + title?: string + model?: { + providerID: string + modelID: string + } + agent?: string + variant?: string +} + +export type WorkerBackgroundJobResult = { + jobId: string + status: "accepted" + openworkUrl: string + sessionId: string +} + +type WorkerBackgroundJobFetch = typeof fetch + +function normalizeUrl(value: string) { + return value.trim().replace(/\/+$/, "") +} + +function jsonHeaders(clientToken: string) { + return { + Accept: "application/json", + Authorization: `Bearer ${clientToken}`, + "Content-Type": "application/json", + } +} + +function createJobId() { + return `job_${randomBytes(16).toString("hex")}` +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value) +} + +export function readOpencodeSessionId(payload: unknown) { + if (!isRecord(payload)) { + return null + } + + if (typeof payload.id === "string") { + return payload.id + } + + if (isRecord(payload.data) && typeof payload.data.id === "string") { + return payload.data.id + } + + if (isRecord(payload.session) && typeof payload.session.id === "string") { + return payload.session.id + } + + return null +} + +export function buildBackgroundJobPromptBody(input: WorkerBackgroundJobInput) { + return { + ...(input.model ? { model: input.model } : {}), + ...(input.agent ? { agent: input.agent } : {}), + ...(input.variant ? { variant: input.variant } : {}), + parts: [{ type: "text", text: input.prompt }], + } +} + +async function fetchJson(input: { + fetchImpl: WorkerBackgroundJobFetch + url: string + clientToken: string + body: unknown +}) { + const response = await input.fetchImpl(input.url, { + method: "POST", + headers: jsonHeaders(input.clientToken), + body: JSON.stringify(input.body), + }) + const text = await response.text() + let payload: unknown = null + if (text) { + try { + payload = JSON.parse(text) as unknown + } catch { + payload = { message: text } + } + } + + if (!response.ok) { + const message = isRecord(payload) && typeof payload.message === "string" + ? payload.message + : `Worker background job request failed with ${response.status}` + throw new Error(message) + } + + return payload +} + +export async function startWorkerBackgroundJob( + input: WorkerBackgroundJobInput, + fetchImpl: WorkerBackgroundJobFetch = fetch, +): Promise { + const openworkUrl = normalizeUrl(input.openworkUrl) + const sessionPayload = await fetchJson({ + fetchImpl, + url: `${openworkUrl}/opencode/session`, + clientToken: input.clientToken, + body: input.title ? { title: input.title } : {}, + }) + const sessionId = readOpencodeSessionId(sessionPayload) + if (!sessionId) { + throw new Error("Worker did not return an OpenCode session id") + } + + await fetchJson({ + fetchImpl, + url: `${openworkUrl}/opencode/session/${encodeURIComponent(sessionId)}/prompt_async`, + clientToken: input.clientToken, + body: buildBackgroundJobPromptBody(input), + }) + + return { + jobId: createJobId(), + status: "accepted", + openworkUrl, + sessionId, + } +} diff --git a/ee/apps/den-api/src/workers/daytona-provider-seed.ts b/ee/apps/den-api/src/workers/daytona-provider-seed.ts index f3f8d5d27e..27deb5a84a 100644 --- a/ee/apps/den-api/src/workers/daytona-provider-seed.ts +++ b/ee/apps/den-api/src/workers/daytona-provider-seed.ts @@ -9,7 +9,24 @@ export type DaytonaProviderSeed = { env: Record } +export type DaytonaProviderSeedSummary = { + providerCount: number + providerIds: string[] + envNames: string[] + configPath: string + manifestPath: string +} + const shellEnvNamePattern = /^[A-Za-z_][A-Za-z0-9_]*$/ +const daytonaProviderSeedManifestRelativePath = ".openwork/daytona-provider-seed.json" + +export function daytonaProviderSeedConfigPath(workspacePath: string) { + return `${workspacePath.replace(/\/+$/, "")}/opencode.jsonc` +} + +export function daytonaProviderSeedManifestPath(workspacePath: string) { + return `${workspacePath.replace(/\/+$/, "")}/${daytonaProviderSeedManifestRelativePath}` +} export function shellQuote(value: string) { return `'${value.replace(/'/g, `'"'"'`)}'` @@ -64,22 +81,42 @@ export function buildShellEnvAssignments(env: Record) { .join("") } +export function buildDaytonaProviderSeedSummary(input: { + configPath: string + manifestPath: string + seed: DaytonaProviderSeed | null +}): DaytonaProviderSeedSummary { + return { + providerCount: Object.keys(input.seed?.provider ?? {}).length, + providerIds: Object.keys(input.seed?.provider ?? {}).sort(), + envNames: Object.keys(input.seed?.env ?? {}).sort(), + configPath: input.configPath, + manifestPath: input.manifestPath, + } +} + export function buildDaytonaProviderSeedScript(input: { configPath: string + manifestPath: string seed: DaytonaProviderSeed | null }) { if (!input.seed) { return "" } - const configPayload = Buffer.from(JSON.stringify({ provider: input.seed.provider })).toString("base64") + const seedPayload = Buffer.from(JSON.stringify({ + provider: input.seed.provider, + summary: buildDaytonaProviderSeedSummary(input), + })).toString("base64") const script = [ 'const fs = require("node:fs")', 'const path = require("node:path")', 'const target = process.argv[1]', - 'const raw = Buffer.from(process.env.OPENWORK_DAYTONA_PROVIDER_CONFIG_B64 || "", "base64").toString("utf8")', + 'const manifest = process.argv[2]', + 'const raw = Buffer.from(process.env.OPENWORK_DAYTONA_PROVIDER_SEED_B64 || "", "base64").toString("utf8")', 'const seed = JSON.parse(raw)', 'fs.mkdirSync(path.dirname(target), { recursive: true })', + 'fs.mkdirSync(path.dirname(manifest), { recursive: true })', 'let existing = {}', 'if (fs.existsSync(target)) {', ' try { existing = JSON.parse(fs.readFileSync(target, "utf8")) } catch {}', @@ -87,9 +124,10 @@ export function buildDaytonaProviderSeedScript(input: { 'const existingProvider = existing.provider && typeof existing.provider === "object" && !Array.isArray(existing.provider) ? existing.provider : {}', 'const next = { ...existing, provider: { ...existingProvider, ...seed.provider } }', 'fs.writeFileSync(target, JSON.stringify(next, null, 2) + "\\n")', + 'fs.writeFileSync(manifest, JSON.stringify({ ...seed.summary, seededAt: new Date().toISOString() }, null, 2) + "\\n")', ].join("; ") return [ - `OPENWORK_DAYTONA_PROVIDER_CONFIG_B64=${shellQuote(configPayload)} node -e ${shellQuote(script)} ${shellQuote(input.configPath)}`, + `OPENWORK_DAYTONA_PROVIDER_SEED_B64=${shellQuote(seedPayload)} node -e ${shellQuote(script)} ${shellQuote(input.configPath)} ${shellQuote(input.manifestPath)}`, ].join("\n") } diff --git a/ee/apps/den-api/src/workers/daytona.ts b/ee/apps/den-api/src/workers/daytona.ts index e06f923dc9..c5ae68ac5f 100644 --- a/ee/apps/den-api/src/workers/daytona.ts +++ b/ee/apps/den-api/src/workers/daytona.ts @@ -4,7 +4,7 @@ import { DaytonaSandboxTable } from "@openwork-ee/den-db/schema" import { createDenTypeId } from "@openwork-ee/utils/typeid" import { db } from "../db.js" import { env } from "../env.js" -import { buildDaytonaProviderSeedScript, buildShellEnvAssignments, shellQuote, type DaytonaProviderSeed } from "./daytona-provider-seed.js" +import { buildDaytonaProviderSeedScript, buildShellEnvAssignments, daytonaProviderSeedConfigPath, daytonaProviderSeedManifestPath, shellQuote, type DaytonaProviderSeed } from "./daytona-provider-seed.js" import { loadMemberDaytonaProviderSeed, type DaytonaProviderSeedMembership } from "./daytona-provider-seed-loader.js" type WorkerId = typeof DaytonaSandboxTable.$inferSelect.worker_id @@ -240,7 +240,11 @@ set -u mkdir -p ${shellQuote(env.daytona.workspaceMountPath)} ${shellQuote(env.daytona.dataMountPath)} ${shellQuote(env.daytona.runtimeWorkspacePath)} ${shellQuote(env.daytona.runtimeDataPath)} ${shellQuote(env.daytona.sidecarDir)} ${shellQuote(`${env.daytona.runtimeWorkspacePath}/volumes`)} ln -sfn ${shellQuote(env.daytona.workspaceMountPath)} ${shellQuote(`${env.daytona.runtimeWorkspacePath}/volumes/workspace`) } ln -sfn ${shellQuote(env.daytona.dataMountPath)} ${shellQuote(`${env.daytona.runtimeWorkspacePath}/volumes/data`) } -${buildDaytonaProviderSeedScript({ configPath: `${env.daytona.runtimeWorkspacePath}/opencode.jsonc`, seed: providerSeed })} +${buildDaytonaProviderSeedScript({ + configPath: daytonaProviderSeedConfigPath(env.daytona.runtimeWorkspacePath), + manifestPath: daytonaProviderSeedManifestPath(env.daytona.runtimeWorkspacePath), + seed: providerSeed, +})} ${verifyRuntimeStep} attempt=0 while [ "$attempt" -lt 3 ]; do diff --git a/ee/apps/den-api/test/daytona-provider-seed.test.ts b/ee/apps/den-api/test/daytona-provider-seed.test.ts index b8739ae52f..3917d3acd5 100644 --- a/ee/apps/den-api/test/daytona-provider-seed.test.ts +++ b/ee/apps/den-api/test/daytona-provider-seed.test.ts @@ -5,7 +5,10 @@ import { describe, expect, test } from "bun:test" import { buildDaytonaProviderSeed, buildDaytonaProviderSeedScript, + buildDaytonaProviderSeedSummary, buildShellEnvAssignments, + daytonaProviderSeedConfigPath, + daytonaProviderSeedManifestPath, } from "../src/workers/daytona-provider-seed.js" const openworkProviderConfig = { @@ -20,6 +23,11 @@ const openworkProviderConfig = { } describe("Daytona provider seeding", () => { + test("normalizes runtime config and manifest paths", () => { + expect(daytonaProviderSeedConfigPath("/tmp/workspace/")).toBe("/tmp/workspace/opencode.jsonc") + expect(daytonaProviderSeedManifestPath("/tmp/workspace/")).toBe("/tmp/workspace/.openwork/daytona-provider-seed.json") + }) + test("builds opencode provider config and env vars from accessible LLM providers", () => { const seed = buildDaytonaProviderSeed([ { @@ -47,9 +55,35 @@ describe("Daytona provider seeding", () => { expect(buildShellEnvAssignments(seed?.env ?? {})).toBe(" SAFE_KEY='secret'") }) - test("writes provider config without embedding provider API keys", async () => { + test("summarizes seeded providers without exposing API key values", () => { + const seed = buildDaytonaProviderSeed([ + { + providerId: "openwork", + providerConfig: openworkProviderConfig, + apiKey: "ow_inf_summary_secret", + }, + ]) + + const summary = buildDaytonaProviderSeedSummary({ + configPath: "/workspace/opencode.jsonc", + manifestPath: "/workspace/.openwork/daytona-provider-seed.json", + seed, + }) + + expect(summary).toEqual({ + providerCount: 1, + providerIds: ["openwork"], + envNames: ["OPENWORK_API_KEY"], + configPath: "/workspace/opencode.jsonc", + manifestPath: "/workspace/.openwork/daytona-provider-seed.json", + }) + expect(JSON.stringify(summary)).not.toContain("ow_inf_summary_secret") + }) + + test("writes provider config and manifest without embedding provider API keys", async () => { const dir = await mkdtemp(join(tmpdir(), "openwork-daytona-provider-seed-")) const configPath = join(dir, "opencode.jsonc") + const manifestPath = daytonaProviderSeedManifestPath(dir) await writeFile(configPath, JSON.stringify({ theme: "dark", provider: { existing: { env: ["EXISTING_KEY"] } } }, null, 2), "utf8") const seed = buildDaytonaProviderSeed([ @@ -59,7 +93,7 @@ describe("Daytona provider seeding", () => { apiKey: "ow_inf_should_not_be_written", }, ]) - const script = buildDaytonaProviderSeedScript({ configPath, seed }) + const script = buildDaytonaProviderSeedScript({ configPath, manifestPath, seed }) expect(script).not.toContain("ow_inf_should_not_be_written") @@ -71,5 +105,11 @@ describe("Daytona provider seeding", () => { expect(text).toContain('"existing"') expect(text).toContain('"openwork"') expect(text).not.toContain("ow_inf_should_not_be_written") + + const manifest = await readFile(manifestPath, "utf8") + expect(manifest).toContain('"providerIds"') + expect(manifest).toContain('"openwork"') + expect(manifest).toContain('"OPENWORK_API_KEY"') + expect(manifest).not.toContain("ow_inf_should_not_be_written") }) }) diff --git a/ee/apps/den-api/test/mcp-worker-background-jobs.test.ts b/ee/apps/den-api/test/mcp-worker-background-jobs.test.ts new file mode 100644 index 0000000000..6e6cd739c7 --- /dev/null +++ b/ee/apps/den-api/test/mcp-worker-background-jobs.test.ts @@ -0,0 +1,16 @@ +import { describe, expect, test } from "bun:test" +import { isMcpOperationAllowed, requiredScopeForMethod } from "../src/mcp/policy.js" + +describe("MCP worker background jobs", () => { + test("allows Workers-tagged background job operations through Cloud MCP", () => { + expect(isMcpOperationAllowed({ + method: "POST", + path: "/v1/workers/{id}/background-jobs", + operation: { + operationId: "postV1WorkersByIdBackgroundJobs", + tags: ["Workers"], + }, + })).toBe(true) + expect(requiredScopeForMethod("POST")).toBe("mcp:write") + }) +}) diff --git a/ee/apps/den-api/test/worker-background-jobs.test.ts b/ee/apps/den-api/test/worker-background-jobs.test.ts new file mode 100644 index 0000000000..ec29a888a0 --- /dev/null +++ b/ee/apps/den-api/test/worker-background-jobs.test.ts @@ -0,0 +1,84 @@ +import { describe, expect, test } from "bun:test" +import { + buildBackgroundJobPromptBody, + readOpencodeSessionId, + startWorkerBackgroundJob, +} from "../src/workers/background-jobs.js" + +function jsonResponse(body: unknown, status = 200) { + return new Response(JSON.stringify(body), { + status, + headers: { "Content-Type": "application/json" }, + }) +} + +describe("worker background jobs", () => { + test("reads OpenCode session ids from common response shapes", () => { + expect(readOpencodeSessionId({ id: "ses_direct" })).toBe("ses_direct") + expect(readOpencodeSessionId({ data: { id: "ses_data" } })).toBe("ses_data") + expect(readOpencodeSessionId({ session: { id: "ses_session" } })).toBe("ses_session") + expect(readOpencodeSessionId({})).toBeNull() + }) + + test("builds prompt_async body from a simple cloud prompt", () => { + expect(buildBackgroundJobPromptBody({ + openworkUrl: "https://worker.example/w/ws_1", + clientToken: "client-token", + prompt: "Review the repo", + model: { providerID: "openwork", modelID: "openwork/deepseek/deepseek-v4-flash" }, + agent: "openwork", + variant: "medium", + })).toEqual({ + model: { providerID: "openwork", modelID: "openwork/deepseek/deepseek-v4-flash" }, + agent: "openwork", + variant: "medium", + parts: [{ type: "text", text: "Review the repo" }], + }) + }) + + test("creates a cloud session and starts prompt_async through injected fetch", async () => { + const calls: Array<{ url: string; body: unknown; authorization: string | null }> = [] + const fetchImpl: typeof fetch = async (url, init) => { + const request = new Request(url, init) + calls.push({ + url: request.url, + body: await request.json(), + authorization: request.headers.get("authorization"), + }) + + if (request.url.endsWith("/opencode/session")) { + return jsonResponse({ data: { id: "ses_cloud" } }) + } + + if (request.url.endsWith("/opencode/session/ses_cloud/prompt_async")) { + return jsonResponse({ ok: true, accepted: true }) + } + + return jsonResponse({ message: "unexpected request" }, 404) + } + + const job = await startWorkerBackgroundJob({ + openworkUrl: "https://worker.example/w/ws_1/", + clientToken: "client-token", + prompt: "Run in cloud", + title: "Cloud job", + }, fetchImpl) + + expect(job.status).toBe("accepted") + expect(job.sessionId).toBe("ses_cloud") + expect(job.openworkUrl).toBe("https://worker.example/w/ws_1") + expect(job.jobId.startsWith("job_")).toBe(true) + expect(calls).toEqual([ + { + url: "https://worker.example/w/ws_1/opencode/session", + body: { title: "Cloud job" }, + authorization: "Bearer client-token", + }, + { + url: "https://worker.example/w/ws_1/opencode/session/ses_cloud/prompt_async", + body: { parts: [{ type: "text", text: "Run in cloud" }] }, + authorization: "Bearer client-token", + }, + ]) + }) +}) From 258dc11447722f229d8555f29f4eee8347d762ea Mon Sep 17 00:00:00 2001 From: Benjamin Shafii Date: Mon, 1 Jun 2026 23:53:30 -0700 Subject: [PATCH 3/3] feat(den): add cloud tasks for dynamic workers --- ee/apps/den-api/src/app.ts | 3 + ee/apps/den-api/src/mcp/policy.ts | 5 + .../den-api/src/routes/cloud-tasks/index.ts | 703 ++++++++++++++++++ ee/apps/den-api/src/routes/workers/core.ts | 103 --- .../den-api/src/workers/background-jobs.ts | 29 + ee/apps/den-api/test/mcp-cloud-tasks.test.ts | 37 + .../test/mcp-worker-background-jobs.test.ts | 16 - .../test/worker-background-jobs.test.ts | 24 + .../den-db/drizzle/0020_cloud_tasks.sql | 51 ++ ee/packages/den-db/drizzle/meta/_journal.json | 9 +- ee/packages/den-db/src/schema/workers.ts | 55 +- ee/packages/utils/src/typeid.ts | 2 + 12 files changed, 916 insertions(+), 121 deletions(-) create mode 100644 ee/apps/den-api/src/routes/cloud-tasks/index.ts create mode 100644 ee/apps/den-api/test/mcp-cloud-tasks.test.ts delete mode 100644 ee/apps/den-api/test/mcp-worker-background-jobs.test.ts create mode 100644 ee/packages/den-db/drizzle/0020_cloud_tasks.sql diff --git a/ee/apps/den-api/src/app.ts b/ee/apps/den-api/src/app.ts index 8e51953cfd..a7e0349b77 100644 --- a/ee/apps/den-api/src/app.ts +++ b/ee/apps/den-api/src/app.ts @@ -14,6 +14,7 @@ import type { MemberTeamsContext, OrganizationContextVariables, UserOrganization import { buildOperationId, emptyResponse, htmlResponse, jsonResponse } from "./openapi.js" import { registerAdminRoutes } from "./routes/admin/index.js" import { registerAuthRoutes } from "./routes/auth/index.js" +import { registerCloudTaskRoutes } from "./routes/cloud-tasks/index.js" import { registerMeRoutes } from "./routes/me/index.js" import { registerOrgRoutes } from "./routes/org/index.js" import { registerTelemetryRoutes } from "./routes/telemetry/index.js" @@ -117,6 +118,7 @@ app.get( registerAdminRoutes(app) registerAuthRoutes(app) +registerCloudTaskRoutes(app) registerMeRoutes(app) registerOrgRoutes(app) registerVersionRoutes(app) @@ -169,6 +171,7 @@ app.get( { name: "LLM Providers", description: "Organization LLM provider catalog, configuration, and access routes." }, { name: "Skills", description: "Organization skill authoring and sharing routes." }, { name: "Skill Hubs", description: "Organization skill hub management and access routes." }, + { name: "Cloud Tasks", description: "Task-first OpenWork Cloud automations and run dispatch." }, { name: "Workers", description: "Worker lifecycle, billing, and runtime routes." }, { name: "Worker Runtime", description: "Worker runtime inspection and upgrade routes." }, { name: "Worker Activity", description: "Worker heartbeat and activity reporting routes." }, diff --git a/ee/apps/den-api/src/mcp/policy.ts b/ee/apps/den-api/src/mcp/policy.ts index 605de9631d..d8f9ed4222 100644 --- a/ee/apps/den-api/src/mcp/policy.ts +++ b/ee/apps/den-api/src/mcp/policy.ts @@ -11,6 +11,7 @@ const SAFE_INCLUDED_TAGS = new Set([ "LLM Providers", "Skills", "Skill Hubs", + "Cloud Tasks", "Workers", "Worker Runtime", "Worker Activity", @@ -31,6 +32,10 @@ const BLOCKED_OPERATION_IDS = new Set([ "deleteV1OrgsByOrgId", "postWorkersByWorkerIdTokens", "postV1WorkersByWorkerIdTokens", + "postWorkersByIdBackgroundJobs", + "postV1WorkersByIdBackgroundJobs", + "postWorkersByWorkerIdBackgroundJobs", + "postV1WorkersByWorkerIdBackgroundJobs", ]) export type OpenApiOperation = { diff --git a/ee/apps/den-api/src/routes/cloud-tasks/index.ts b/ee/apps/den-api/src/routes/cloud-tasks/index.ts new file mode 100644 index 0000000000..57729af750 --- /dev/null +++ b/ee/apps/den-api/src/routes/cloud-tasks/index.ts @@ -0,0 +1,703 @@ +import { and, desc, eq } from "@openwork-ee/den-db/drizzle" +import { + CloudTaskRunTable, + CloudTaskTable, + WorkerInstanceTable, + WorkerTable, + WorkerTokenTable, +} from "@openwork-ee/den-db/schema" +import { createDenTypeId, normalizeDenTypeId } from "@openwork-ee/utils/typeid" +import type { Hono } from "hono" +import { describeRoute } from "hono-openapi" +import { z } from "zod" +import { requireCloudWorkerAccess } from "../../billing/polar.js" +import { db } from "../../db.js" +import { jsonValidator, paramValidator, queryValidator, requireUserMiddleware, resolveOrganizationContextMiddleware, type OrganizationContextVariables, type UserOrganizationsContext } from "../../middleware/index.js" +import { getOrganizationLimitStatus } from "../../organization-limits.js" +import type { OrganizationContext } from "../../orgs.js" +import { denTypeIdSchema, invalidRequestSchema, jsonResponse, notFoundSchema, unauthorizedSchema } from "../../openapi.js" +import type { AuthContextVariables } from "../../session.js" +import { getRequiredUserEmail } from "../../user.js" +import { buildCloudTaskRunJobInput, startWorkerBackgroundJob } from "../../workers/background-jobs.js" +import { provisionWorker } from "../../workers/provisioner.js" +import { getWorkerByIdForOrg, getWorkerTokensAndConnect, token } from "../workers/shared.js" + +type CloudTaskRouteVariables = AuthContextVariables & Partial & Partial + +type CloudTaskRow = typeof CloudTaskTable.$inferSelect +type CloudTaskRunRow = typeof CloudTaskRunTable.$inferSelect +type NormalizedCloudTaskSchedule = + | { + scheduleType: "manual" + scheduleTimeOfDay: null + scheduleTimezone: null + nextRunAt: null + } + | { + scheduleType: "daily" + scheduleTimeOfDay: string + scheduleTimezone: string + nextRunAt: Date + } +type CloudTaskRunAccessResult = + | { allowed: true } + | { allowed: false; status: 400; body: { error: "user_email_required" } } + | { allowed: false; status: 402; body: { error: "cloud_worker_billing_unavailable"; message: string } } + | { allowed: false; status: 409; body: { error: "org_limit_reached"; limitType: "workers"; limit: number; currentCount: number; message: string } } + +const timeOfDaySchema = z.string().regex(/^([01]\d|2[0-3]):[0-5]\d$/, "Use 24-hour HH:mm time.") + +export function isValidTimeZone(value: string) { + try { + new Intl.DateTimeFormat("en-US", { timeZone: value }).format(new Date()) + return true + } catch { + return false + } +} + +const cloudTaskScheduleInputSchema = z.object({ + type: z.enum(["manual", "daily"]).default("manual"), + timeOfDay: timeOfDaySchema.optional(), + timezone: z.string().trim().min(1).max(64).refine(isValidTimeZone, "Use a valid IANA time zone, such as UTC or America/Los_Angeles.").optional(), +}).optional() + +const cloudTaskCreateSchema = z.object({ + name: z.string().trim().min(1).max(255).optional(), + prompt: z.string().trim().min(1).max(12000), + schedule: cloudTaskScheduleInputSchema, + model: z.object({ + providerID: z.string().trim().min(1).max(255), + modelID: z.string().trim().min(1).max(255), + }).optional(), + agent: z.string().trim().min(1).max(255).optional(), + variant: z.string().trim().min(1).max(255).optional(), + enabled: z.boolean().default(true), +}).meta({ ref: "CloudTaskCreateRequest" }) + +const cloudTaskIdParamSchema = z.object({ + id: denTypeIdSchema("cloudTask"), +}) + +const listCloudTasksQuerySchema = z.object({ + limit: z.coerce.number().int().min(1).max(100).default(50), +}) + +const cloudTaskSchema = z.object({ + id: denTypeIdSchema("cloudTask"), + orgId: denTypeIdSchema("organization"), + createdByUserId: denTypeIdSchema("user").nullable(), + createdByMemberId: denTypeIdSchema("member").nullable(), + name: z.string(), + prompt: z.string(), + scheduleType: z.enum(["manual", "daily"]), + scheduleTimeOfDay: z.string().nullable(), + scheduleTimezone: z.string().nullable(), + model: z.object({ + providerID: z.string(), + modelID: z.string(), + }).nullable(), + agent: z.string().nullable(), + variant: z.string().nullable(), + enabled: z.boolean(), + nextRunAt: z.string().datetime().nullable(), + lastRunId: denTypeIdSchema("cloudTaskRun").nullable(), + createdAt: z.string().datetime(), + updatedAt: z.string().datetime(), +}).meta({ ref: "CloudTask" }) + +const cloudTaskRunSchema = z.object({ + id: denTypeIdSchema("cloudTaskRun"), + taskId: denTypeIdSchema("cloudTask"), + orgId: denTypeIdSchema("organization"), + workerId: denTypeIdSchema("worker").nullable(), + status: z.enum(["pending", "provisioning", "running", "accepted", "failed", "cancelled"]), + sessionId: z.string().nullable(), + openworkUrl: z.string().nullable(), + errorMessage: z.string().nullable(), + startedAt: z.string().datetime().nullable(), + completedAt: z.string().datetime().nullable(), + createdAt: z.string().datetime(), + updatedAt: z.string().datetime(), +}).meta({ ref: "CloudTaskRun" }) + +const cloudTaskListResponseSchema = z.object({ + tasks: z.array(cloudTaskSchema), +}).meta({ ref: "CloudTaskListResponse" }) + +const cloudTaskResponseSchema = z.object({ + task: cloudTaskSchema, +}).meta({ ref: "CloudTaskResponse" }) + +const cloudTaskRunResponseSchema = z.object({ + task: cloudTaskSchema, + run: cloudTaskRunSchema, +}).meta({ ref: "CloudTaskRunResponse" }) + +const paymentRequiredSchema = z.object({ + error: z.literal("cloud_worker_billing_unavailable"), + message: z.string(), +}).meta({ ref: "CloudTaskPaymentRequiredError" }) + +const userEmailRequiredSchema = z.object({ + error: z.literal("user_email_required"), +}).meta({ ref: "CloudTaskUserEmailRequiredError" }) + +const orgLimitReachedSchema = z.object({ + error: z.literal("org_limit_reached"), + limitType: z.literal("workers"), + limit: z.number().int(), + currentCount: z.number().int(), + message: z.string(), +}).meta({ ref: "CloudTaskOrgLimitReachedError" }) + +const workerRuntimeUnavailableSchema = z.object({ + error: z.literal("worker_runtime_unavailable"), + message: z.string(), +}).meta({ ref: "CloudTaskWorkerRuntimeUnavailableError" }) + +function numberPart(parts: Intl.DateTimeFormatPart[], type: Intl.DateTimeFormatPartTypes) { + const value = parts.find((part) => part.type === type)?.value + if (!value) { + throw new Error(`Missing ${type} from formatted date.`) + } + return Number.parseInt(value, 10) +} + +function zonedParts(date: Date, timeZone: string) { + const parts = new Intl.DateTimeFormat("en-US", { + timeZone, + hourCycle: "h23", + year: "numeric", + month: "2-digit", + day: "2-digit", + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + }).formatToParts(date) + + return { + year: numberPart(parts, "year"), + month: numberPart(parts, "month"), + day: numberPart(parts, "day"), + hour: numberPart(parts, "hour"), + minute: numberPart(parts, "minute"), + second: numberPart(parts, "second"), + } +} + +function zonedTimeToUtc(input: { + year: number + month: number + day: number + hour: number + minute: number + timeZone: string +}) { + const utcGuess = new Date(Date.UTC(input.year, input.month - 1, input.day, input.hour, input.minute, 0, 0)) + const actualParts = zonedParts(utcGuess, input.timeZone) + const actualAsUtc = Date.UTC( + actualParts.year, + actualParts.month - 1, + actualParts.day, + actualParts.hour, + actualParts.minute, + actualParts.second, + ) + const expectedAsUtc = Date.UTC(input.year, input.month - 1, input.day, input.hour, input.minute, 0, 0) + return new Date(expectedAsUtc - (actualAsUtc - utcGuess.getTime())) +} + +export function nextDailyRunAt(input: { + timeOfDay: string + timezone: string + now?: Date +}) { + const [hourPart, minutePart] = input.timeOfDay.split(":") + const hour = Number.parseInt(hourPart, 10) + const minute = Number.parseInt(minutePart, 10) + const now = input.now ?? new Date() + const today = zonedParts(now, input.timezone) + let candidate = zonedTimeToUtc({ + year: today.year, + month: today.month, + day: today.day, + hour, + minute, + timeZone: input.timezone, + }) + + if (candidate.getTime() <= now.getTime()) { + const nextLocalDay = new Date(Date.UTC(today.year, today.month - 1, today.day + 1)) + candidate = zonedTimeToUtc({ + year: nextLocalDay.getUTCFullYear(), + month: nextLocalDay.getUTCMonth() + 1, + day: nextLocalDay.getUTCDate(), + hour, + minute, + timeZone: input.timezone, + }) + } + + return candidate +} + +export function normalizeCloudTaskSchedule(input: z.infer): NormalizedCloudTaskSchedule { + if (input?.type !== "daily") { + const scheduleType = "manual" + return { + scheduleType, + scheduleTimeOfDay: null, + scheduleTimezone: null, + nextRunAt: null, + } + } + + const scheduleTimeOfDay = input.timeOfDay ?? "09:00" + const scheduleTimezone = input.timezone ?? "UTC" + const scheduleType = "daily" + return { + scheduleType, + scheduleTimeOfDay, + scheduleTimezone, + nextRunAt: nextDailyRunAt({ timeOfDay: scheduleTimeOfDay, timezone: scheduleTimezone }), + } +} + +function defaultCloudTaskName(prompt: string) { + const normalized = prompt.replace(/\s+/g, " ").trim() + return normalized.length > 80 ? `${normalized.slice(0, 77)}...` : normalized +} + +function memberTeamIdsForContext(context: OrganizationContext) { + return context.teams + .filter((team) => team.memberIds.includes(context.currentMember.id)) + .map((team) => team.id) +} + +export function toCloudTaskResponse(row: CloudTaskRow) { + const model = row.model_provider_id && row.model_id + ? { providerID: row.model_provider_id, modelID: row.model_id } + : null + + return { + id: row.id, + orgId: row.org_id, + createdByUserId: row.created_by_user_id, + createdByMemberId: row.created_by_member_id, + name: row.name, + prompt: row.prompt, + scheduleType: row.schedule_type, + scheduleTimeOfDay: row.schedule_time_of_day, + scheduleTimezone: row.schedule_timezone, + model, + agent: row.agent, + variant: row.variant, + enabled: row.enabled, + nextRunAt: row.next_run_at, + lastRunId: row.last_run_id, + createdAt: row.created_at, + updatedAt: row.updated_at, + } +} + +export function toCloudTaskRunResponse(row: CloudTaskRunRow) { + return { + id: row.id, + taskId: row.task_id, + orgId: row.org_id, + workerId: row.worker_id, + status: row.status, + sessionId: row.session_id, + openworkUrl: row.openwork_url, + errorMessage: row.error_message, + startedAt: row.started_at, + completedAt: row.completed_at, + createdAt: row.created_at, + updatedAt: row.updated_at, + } +} + +function parseCloudTaskIdParam(value: string) { + return normalizeDenTypeId("cloudTask", value) +} + +async function getCloudTaskByIdForOrg(taskId: CloudTaskRow["id"], orgId: CloudTaskRow["org_id"]) { + const rows = await db + .select() + .from(CloudTaskTable) + .where(and(eq(CloudTaskTable.id, taskId), eq(CloudTaskTable.org_id, orgId))) + .limit(1) + + return rows[0] ?? null +} + +function truncateErrorMessage(error: unknown) { + const message = error instanceof Error ? error.message : "cloud_task_run_failed" + return message.slice(0, 2048) +} + +async function markCloudTaskRunFailed(input: { + runId: CloudTaskRunRow["id"] + workerId: NonNullable + error: unknown +}) { + await db + .update(CloudTaskRunTable) + .set({ + status: "failed", + error_message: truncateErrorMessage(input.error), + completed_at: new Date(), + }) + .where(eq(CloudTaskRunTable.id, input.runId)) + + await db + .update(WorkerTable) + .set({ status: "failed" }) + .where(eq(WorkerTable.id, input.workerId)) +} + +async function continueCloudTaskRun(input: { + task: CloudTaskRow + runId: CloudTaskRunRow["id"] + workerId: NonNullable + hostToken: string + clientToken: string + activityToken: string + organizationContext: OrganizationContext +}) { + try { + const provisioned = await provisionWorker({ + workerId: input.workerId, + name: input.task.name, + hostToken: input.hostToken, + clientToken: input.clientToken, + activityToken: input.activityToken, + organizationId: input.organizationContext.organization.id, + memberId: input.organizationContext.currentMember.id, + memberTeamIds: memberTeamIdsForContext(input.organizationContext), + }) + + await db + .update(WorkerTable) + .set({ status: provisioned.status }) + .where(eq(WorkerTable.id, input.workerId)) + + await db.insert(WorkerInstanceTable).values({ + id: createDenTypeId("workerInstance"), + worker_id: input.workerId, + provider: provisioned.provider, + region: provisioned.region, + url: provisioned.url, + status: provisioned.status, + }) + + if (provisioned.status !== "healthy") { + throw new Error("Cloud worker is still provisioning and cannot accept the task run yet.") + } + + await db + .update(CloudTaskRunTable) + .set({ status: "running" }) + .where(eq(CloudTaskRunTable.id, input.runId)) + + const worker = await getWorkerByIdForOrg(input.workerId, input.task.org_id) + if (!worker) { + throw new Error("Cloud task worker record was not found after provisioning.") + } + + const access = await getWorkerTokensAndConnect(worker) + if ("error" in access && access.error) { + throw new Error(access.error.body.message) + } + + if (!access.connect?.openworkUrl || !access.connect.workspaceId) { + throw new Error("Worker runtime access is not ready yet. Wait for provisioning to finish and try again.") + } + + const job = await startWorkerBackgroundJob(buildCloudTaskRunJobInput({ + task: input.task, + openworkUrl: access.connect.openworkUrl, + clientToken: access.tokens.client, + })) + + await db + .update(CloudTaskRunTable) + .set({ + status: job.status, + session_id: job.sessionId, + openwork_url: job.openworkUrl, + }) + .where(eq(CloudTaskRunTable.id, input.runId)) + } catch (error) { + await markCloudTaskRunFailed({ + runId: input.runId, + workerId: input.workerId, + error, + }) + } +} + +async function startCloudTaskRun(input: { + task: CloudTaskRow + userId: string + organizationContext: OrganizationContext +}) { + const workerId = createDenTypeId("worker") + const runId = createDenTypeId("cloudTaskRun") + const hostToken = token() + const clientToken = token() + const activityToken = token() + const now = new Date() + const workerName = `Task ${input.task.name}`.slice(0, 255) + const runRow: CloudTaskRunRow = { + id: runId, + task_id: input.task.id, + org_id: input.task.org_id, + worker_id: workerId, + status: "provisioning", + session_id: null, + openwork_url: null, + error_message: null, + started_at: now, + completed_at: null, + created_at: now, + updated_at: now, + } + + await db.transaction(async (tx) => { + await tx.insert(WorkerTable).values({ + id: workerId, + org_id: input.task.org_id, + created_by_user_id: normalizeDenTypeId("user", input.userId), + name: workerName, + description: `Runtime created for cloud task ${input.task.id}`, + destination: "cloud", + status: "provisioning", + }) + + await tx.insert(WorkerTokenTable).values([ + { + id: createDenTypeId("workerToken"), + worker_id: workerId, + scope: "host", + token: hostToken, + }, + { + id: createDenTypeId("workerToken"), + worker_id: workerId, + scope: "client", + token: clientToken, + }, + { + id: createDenTypeId("workerToken"), + worker_id: workerId, + scope: "activity", + token: activityToken, + }, + ]) + + await tx.insert(CloudTaskRunTable).values(runRow) + await tx.update(CloudTaskTable).set({ last_run_id: runId }).where(eq(CloudTaskTable.id, input.task.id)) + }) + + void continueCloudTaskRun({ + task: input.task, + runId, + workerId, + hostToken, + clientToken, + activityToken, + organizationContext: input.organizationContext, + }) + + return runRow +} + +async function requireCloudTaskRunAccess(input: { + user: { id: string; email?: string | null; name?: string | null } + orgId: CloudTaskRow["org_id"] +}): Promise { + const email = getRequiredUserEmail(input.user) + if (!email) { + const allowed = false + const status = 400 + const error = "user_email_required" + return { allowed, status, body: { error } } + } + + const access = await requireCloudWorkerAccess({ + userId: normalizeDenTypeId("user", input.user.id), + email, + name: input.user.name ?? input.user.email ?? "OpenWork User", + }) + + if (!access.allowed) { + const allowed = false + const status = 402 + const error = "cloud_worker_billing_unavailable" + return { + allowed, + status, + body: { + error, + message: "Running cloud tasks requires an existing OpenWork Cloud plan. New self-serve purchases are no longer available.", + }, + } + } + + const workerLimit = await getOrganizationLimitStatus(input.orgId, "workers") + if (workerLimit.exceeded) { + const allowed = false + const status = 409 + const error = "org_limit_reached" + const limitType = "workers" + return { + allowed, + status, + body: { + error, + limitType, + limit: workerLimit.limit, + currentCount: workerLimit.currentCount, + message: `This workspace currently supports up to ${workerLimit.limit} workers. Contact support to increase the limit.`, + }, + } + } + + const allowed = true + return { allowed } +} + +export function registerCloudTaskRoutes(app: Hono) { + app.get( + "/v1/cloud-tasks", + describeRoute({ + tags: ["Cloud Tasks"], + summary: "List cloud tasks", + description: "Lists task-first OpenWork Cloud automations for the caller's active organization.", + responses: { + 200: jsonResponse("Cloud tasks returned successfully.", cloudTaskListResponseSchema), + 400: jsonResponse("The cloud task list query parameters were invalid.", invalidRequestSchema), + 401: jsonResponse("The caller must be signed in to list cloud tasks.", unauthorizedSchema), + }, + }), + requireUserMiddleware, + resolveOrganizationContextMiddleware, + queryValidator(listCloudTasksQuerySchema), + async (c) => { + const organizationContext = c.get("organizationContext") + const query = c.req.valid("query") + const rows = await db + .select() + .from(CloudTaskTable) + .where(eq(CloudTaskTable.org_id, organizationContext.organization.id)) + .orderBy(desc(CloudTaskTable.created_at)) + .limit(query.limit) + + return c.json({ tasks: rows.map(toCloudTaskResponse) }) + }, + ) + + app.post( + "/v1/cloud-tasks", + describeRoute({ + tags: ["Cloud Tasks"], + summary: "Create cloud task", + description: "Creates a lightweight scheduled cloud task. Runs provision a cloud worker dynamically with the caller's accessible provider configuration.", + responses: { + 201: jsonResponse("Cloud task created successfully.", cloudTaskResponseSchema), + 400: jsonResponse("The cloud task create payload was invalid.", invalidRequestSchema), + 401: jsonResponse("The caller must be signed in to create cloud tasks.", unauthorizedSchema), + }, + }), + requireUserMiddleware, + resolveOrganizationContextMiddleware, + jsonValidator(cloudTaskCreateSchema), + async (c) => { + const user = c.get("user") + const organizationContext = c.get("organizationContext") + const input = c.req.valid("json") + const schedule = normalizeCloudTaskSchedule(input.schedule) + const now = new Date() + const row: CloudTaskRow = { + id: createDenTypeId("cloudTask"), + org_id: organizationContext.organization.id, + created_by_user_id: normalizeDenTypeId("user", user.id), + created_by_member_id: organizationContext.currentMember.id, + name: input.name ?? defaultCloudTaskName(input.prompt), + prompt: input.prompt, + schedule_type: schedule.scheduleType, + schedule_time_of_day: schedule.scheduleTimeOfDay, + schedule_timezone: schedule.scheduleTimezone, + model_provider_id: input.model?.providerID ?? null, + model_id: input.model?.modelID ?? null, + agent: input.agent ?? null, + variant: input.variant ?? null, + enabled: input.enabled, + next_run_at: schedule.nextRunAt, + last_run_id: null, + created_at: now, + updated_at: now, + } + + await db.insert(CloudTaskTable).values(row) + + return c.json({ task: toCloudTaskResponse(row) }, 201) + }, + ) + + app.post( + "/v1/cloud-tasks/:id/runs", + describeRoute({ + tags: ["Cloud Tasks"], + summary: "Run cloud task", + description: "Starts a cloud task run by dynamically provisioning a cloud worker, seeding provider config, and dispatching the task prompt asynchronously.", + responses: { + 202: jsonResponse("Cloud task run accepted.", cloudTaskRunResponseSchema), + 400: jsonResponse("The cloud task run request was invalid.", z.union([invalidRequestSchema, userEmailRequiredSchema])), + 401: jsonResponse("The caller must be signed in to run cloud tasks.", unauthorizedSchema), + 402: jsonResponse("The caller needs an active cloud plan before running cloud tasks.", paymentRequiredSchema), + 404: jsonResponse("The cloud task could not be found.", notFoundSchema), + 409: jsonResponse("The cloud task cannot be run yet.", z.union([orgLimitReachedSchema, workerRuntimeUnavailableSchema])), + }, + }), + requireUserMiddleware, + resolveOrganizationContextMiddleware, + paramValidator(cloudTaskIdParamSchema), + async (c) => { + const user = c.get("user") + const organizationContext = c.get("organizationContext") + const params = c.req.valid("param") + const taskId = parseCloudTaskIdParam(params.id) + const task = await getCloudTaskByIdForOrg(taskId, organizationContext.organization.id) + + if (!task) { + return c.json({ error: "cloud_task_not_found" }, 404) + } + + if (!task.enabled) { + return c.json({ + error: "worker_runtime_unavailable", + message: "Cloud task is disabled.", + }, 409) + } + + const access = await requireCloudTaskRunAccess({ user, orgId: organizationContext.organization.id }) + if (!access.allowed) { + return c.json(access.body, access.status) + } + + const run = await startCloudTaskRun({ + task, + userId: user.id, + organizationContext, + }) + + return c.json({ + task: toCloudTaskResponse({ ...task, last_run_id: run.id, updated_at: new Date() }), + run: toCloudTaskRunResponse(run), + }, 202) + }, + ) +} diff --git a/ee/apps/den-api/src/routes/workers/core.ts b/ee/apps/den-api/src/routes/workers/core.ts index 4210def101..3a6cf1c876 100644 --- a/ee/apps/den-api/src/routes/workers/core.ts +++ b/ee/apps/den-api/src/routes/workers/core.ts @@ -11,7 +11,6 @@ import { denTypeIdSchema, emptyResponse, forbiddenSchema, invalidRequestSchema, import { getOrganizationLimitStatus } from "../../organization-limits.js" import type { OrganizationContext } from "../../orgs.js" import { getRequiredUserEmail } from "../../user.js" -import { startWorkerBackgroundJob } from "../../workers/background-jobs.js" import { buildDaytonaProviderSeedSummary, daytonaProviderSeedConfigPath, daytonaProviderSeedManifestPath } from "../../workers/daytona-provider-seed.js" import { loadMemberDaytonaProviderSeed } from "../../workers/daytona-provider-seed-loader.js" import type { WorkerRouteVariables } from "./shared.js" @@ -104,27 +103,6 @@ const workerProviderSeedPreviewResponseSchema = z.object({ manifestPath: z.string(), }).meta({ ref: "WorkerProviderSeedPreviewResponse" }) -const workerBackgroundJobCreateSchema = z.object({ - prompt: z.string().trim().min(1), - title: z.string().trim().min(1).max(255).optional(), - model: z.object({ - providerID: z.string().trim().min(1), - modelID: z.string().trim().min(1), - }).optional(), - agent: z.string().trim().min(1).optional(), - variant: z.string().trim().min(1).optional(), -}).meta({ ref: "WorkerBackgroundJobCreateRequest" }) - -const workerBackgroundJobResponseSchema = z.object({ - job: z.object({ - id: z.string(), - status: z.literal("accepted"), - workerId: denTypeIdSchema("worker"), - sessionId: z.string(), - openworkUrl: z.string(), - }), -}).meta({ ref: "WorkerBackgroundJobResponse" }) - const organizationUnavailableSchema = z.object({ error: z.literal("organization_unavailable"), }).meta({ ref: "OrganizationUnavailableError" }) @@ -390,87 +368,6 @@ export function registerWorkerCoreRoutes { - const orgId = c.get("activeOrganizationId") - const params = c.req.valid("param") - const input = c.req.valid("json") - - if (!orgId) { - return c.json({ error: "worker_not_found" }, 404) - } - - let workerId - try { - workerId = parseWorkerIdParam(params.id) - } catch { - return c.json({ error: "worker_not_found" }, 404) - } - - const worker = await getWorkerByIdForOrg(workerId, orgId) - if (!worker || worker.destination !== "cloud") { - return c.json({ error: "worker_not_found" }, 404) - } - - const access = await getWorkerTokensAndConnect(worker) - if ("error" in access && access.error) { - return c.json(access.error.body, 409) - } - - if (!access.connect?.openworkUrl || !access.connect.workspaceId) { - return c.json({ - error: "worker_runtime_unavailable", - message: "Worker runtime access is not ready yet. Wait for provisioning to finish and try again.", - }, 409) - } - - let job - try { - job = await startWorkerBackgroundJob({ - openworkUrl: access.connect.openworkUrl, - clientToken: access.tokens.client, - prompt: input.prompt, - title: input.title, - model: input.model, - agent: input.agent, - variant: input.variant, - }) - } catch (error) { - return c.json({ - error: "worker_runtime_unavailable", - message: error instanceof Error ? error.message : "Worker did not accept the background job.", - }, 409) - } - - return c.json({ - job: { - id: job.jobId, - status: job.status, - workerId: worker.id, - sessionId: job.sessionId, - openworkUrl: job.openworkUrl, - }, - }, 202) - }, - ) - app.get( "/v1/workers/:id", describeRoute({ diff --git a/ee/apps/den-api/src/workers/background-jobs.ts b/ee/apps/den-api/src/workers/background-jobs.ts index baf3d2e9e7..7012b0721f 100644 --- a/ee/apps/den-api/src/workers/background-jobs.ts +++ b/ee/apps/den-api/src/workers/background-jobs.ts @@ -13,6 +13,15 @@ export type WorkerBackgroundJobInput = { variant?: string } +type CloudTaskJobSource = { + name: string + prompt: string + model_provider_id: string | null + model_id: string | null + agent: string | null + variant: string | null +} + export type WorkerBackgroundJobResult = { jobId: string status: "accepted" @@ -71,6 +80,26 @@ export function buildBackgroundJobPromptBody(input: WorkerBackgroundJobInput) { } } +export function buildCloudTaskRunJobInput(input: { + task: CloudTaskJobSource + openworkUrl: string + clientToken: string +}): WorkerBackgroundJobInput { + const model = input.task.model_provider_id && input.task.model_id + ? { providerID: input.task.model_provider_id, modelID: input.task.model_id } + : undefined + + return { + openworkUrl: input.openworkUrl, + clientToken: input.clientToken, + prompt: input.task.prompt, + title: input.task.name, + ...(model ? { model } : {}), + ...(input.task.agent ? { agent: input.task.agent } : {}), + ...(input.task.variant ? { variant: input.task.variant } : {}), + } +} + async function fetchJson(input: { fetchImpl: WorkerBackgroundJobFetch url: string diff --git a/ee/apps/den-api/test/mcp-cloud-tasks.test.ts b/ee/apps/den-api/test/mcp-cloud-tasks.test.ts new file mode 100644 index 0000000000..d9dde13af0 --- /dev/null +++ b/ee/apps/den-api/test/mcp-cloud-tasks.test.ts @@ -0,0 +1,37 @@ +import { describe, expect, test } from "bun:test" +import { isMcpOperationAllowed, requiredScopeForMethod } from "../src/mcp/policy.js" + +describe("MCP cloud tasks", () => { + test("allows Cloud Tasks create and run operations through Cloud MCP", () => { + expect(isMcpOperationAllowed({ + method: "POST", + path: "/v1/cloud-tasks", + operation: { + operationId: "postCloudTasks", + tags: ["Cloud Tasks"], + }, + })).toBe(true) + + expect(isMcpOperationAllowed({ + method: "POST", + path: "/v1/cloud-tasks/{id}/runs", + operation: { + operationId: "postCloudTasksByIdRuns", + tags: ["Cloud Tasks"], + }, + })).toBe(true) + + expect(requiredScopeForMethod("POST")).toBe("mcp:write") + }) + + test("blocks the stale worker-bound background job operation", () => { + expect(isMcpOperationAllowed({ + method: "POST", + path: "/v1/workers/{id}/background-jobs", + operation: { + operationId: "postWorkersByIdBackgroundJobs", + tags: ["Workers"], + }, + })).toBe(false) + }) +}) diff --git a/ee/apps/den-api/test/mcp-worker-background-jobs.test.ts b/ee/apps/den-api/test/mcp-worker-background-jobs.test.ts deleted file mode 100644 index 6e6cd739c7..0000000000 --- a/ee/apps/den-api/test/mcp-worker-background-jobs.test.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { describe, expect, test } from "bun:test" -import { isMcpOperationAllowed, requiredScopeForMethod } from "../src/mcp/policy.js" - -describe("MCP worker background jobs", () => { - test("allows Workers-tagged background job operations through Cloud MCP", () => { - expect(isMcpOperationAllowed({ - method: "POST", - path: "/v1/workers/{id}/background-jobs", - operation: { - operationId: "postV1WorkersByIdBackgroundJobs", - tags: ["Workers"], - }, - })).toBe(true) - expect(requiredScopeForMethod("POST")).toBe("mcp:write") - }) -}) diff --git a/ee/apps/den-api/test/worker-background-jobs.test.ts b/ee/apps/den-api/test/worker-background-jobs.test.ts index ec29a888a0..f0e19083b6 100644 --- a/ee/apps/den-api/test/worker-background-jobs.test.ts +++ b/ee/apps/den-api/test/worker-background-jobs.test.ts @@ -1,6 +1,7 @@ import { describe, expect, test } from "bun:test" import { buildBackgroundJobPromptBody, + buildCloudTaskRunJobInput, readOpencodeSessionId, startWorkerBackgroundJob, } from "../src/workers/background-jobs.js" @@ -36,6 +37,29 @@ describe("worker background jobs", () => { }) }) + test("builds worker job input from a cloud task row without changing secrets", () => { + expect(buildCloudTaskRunJobInput({ + task: { + name: "Daily repo review", + prompt: "Review the repo", + model_provider_id: "openwork", + model_id: "openwork/deepseek/deepseek-v4-flash", + agent: "openwork", + variant: "medium", + }, + openworkUrl: "https://worker.example/w/ws_1", + clientToken: "client-token", + })).toEqual({ + openworkUrl: "https://worker.example/w/ws_1", + clientToken: "client-token", + prompt: "Review the repo", + title: "Daily repo review", + model: { providerID: "openwork", modelID: "openwork/deepseek/deepseek-v4-flash" }, + agent: "openwork", + variant: "medium", + }) + }) + test("creates a cloud session and starts prompt_async through injected fetch", async () => { const calls: Array<{ url: string; body: unknown; authorization: string | null }> = [] const fetchImpl: typeof fetch = async (url, init) => { diff --git a/ee/packages/den-db/drizzle/0020_cloud_tasks.sql b/ee/packages/den-db/drizzle/0020_cloud_tasks.sql new file mode 100644 index 0000000000..4f858989bb --- /dev/null +++ b/ee/packages/den-db/drizzle/0020_cloud_tasks.sql @@ -0,0 +1,51 @@ +CREATE TABLE `cloud_task` ( + `id` varchar(64) NOT NULL, + `org_id` varchar(64) NOT NULL, + `created_by_user_id` varchar(64), + `created_by_member_id` varchar(64), + `name` varchar(255) NOT NULL, + `prompt` varchar(12000) NOT NULL, + `schedule_type` enum('manual','daily') NOT NULL, + `schedule_time_of_day` varchar(5), + `schedule_timezone` varchar(64), + `model_provider_id` varchar(255), + `model_id` varchar(255), + `agent` varchar(255), + `variant` varchar(255), + `enabled` boolean NOT NULL DEFAULT true, + `next_run_at` timestamp(3), + `last_run_id` varchar(64), + `created_at` timestamp(3) NOT NULL DEFAULT (now()), + `updated_at` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3), + CONSTRAINT `cloud_task_id` PRIMARY KEY(`id`) +); +--> statement-breakpoint +CREATE TABLE `cloud_task_run` ( + `id` varchar(64) NOT NULL, + `task_id` varchar(64) NOT NULL, + `org_id` varchar(64) NOT NULL, + `worker_id` varchar(64), + `status` enum('pending','provisioning','running','accepted','failed','cancelled') NOT NULL, + `session_id` varchar(128), + `openwork_url` varchar(2048), + `error_message` varchar(2048), + `started_at` timestamp(3), + `completed_at` timestamp(3), + `created_at` timestamp(3) NOT NULL DEFAULT (now()), + `updated_at` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3), + CONSTRAINT `cloud_task_run_id` PRIMARY KEY(`id`) +); +--> statement-breakpoint +CREATE INDEX `cloud_task_org_id` ON `cloud_task` (`org_id`); +--> statement-breakpoint +CREATE INDEX `cloud_task_created_by_user_id` ON `cloud_task` (`created_by_user_id`); +--> statement-breakpoint +CREATE INDEX `cloud_task_next_run_at` ON `cloud_task` (`next_run_at`); +--> statement-breakpoint +CREATE INDEX `cloud_task_run_task_id` ON `cloud_task_run` (`task_id`); +--> statement-breakpoint +CREATE INDEX `cloud_task_run_org_id` ON `cloud_task_run` (`org_id`); +--> statement-breakpoint +CREATE INDEX `cloud_task_run_worker_id` ON `cloud_task_run` (`worker_id`); +--> statement-breakpoint +CREATE INDEX `cloud_task_run_status` ON `cloud_task_run` (`status`); diff --git a/ee/packages/den-db/drizzle/meta/_journal.json b/ee/packages/den-db/drizzle/meta/_journal.json index ccb0953c31..2953ef61db 100644 --- a/ee/packages/den-db/drizzle/meta/_journal.json +++ b/ee/packages/den-db/drizzle/meta/_journal.json @@ -134,6 +134,13 @@ "when": 1779754557313, "tag": "0019_scim_sso_identity", "breakpoints": true + }, + { + "idx": 20, + "version": "5", + "when": 1780272000000, + "tag": "0020_cloud_tasks", + "breakpoints": true } ] -} \ No newline at end of file +} diff --git a/ee/packages/den-db/src/schema/workers.ts b/ee/packages/den-db/src/schema/workers.ts index a807ca8d85..3c4372f5c2 100644 --- a/ee/packages/den-db/src/schema/workers.ts +++ b/ee/packages/den-db/src/schema/workers.ts @@ -1,9 +1,11 @@ -import { index, json, mysqlEnum, mysqlTable, timestamp, uniqueIndex, varchar } from "drizzle-orm/mysql-core" +import { boolean, index, json, mysqlEnum, mysqlTable, timestamp, uniqueIndex, varchar } from "drizzle-orm/mysql-core" import { denTypeIdColumn, timestamps } from "../columns" export const WorkerDestination = ["local", "cloud"] as const export const WorkerStatus = ["provisioning", "healthy", "failed", "stopped"] as const export const TokenScope = ["client", "host", "activity"] as const +export const CloudTaskScheduleType = ["manual", "daily"] as const +export const CloudTaskRunStatus = ["pending", "provisioning", "running", "accepted", "failed", "cancelled"] as const export const WorkerTable = mysqlTable( "worker", @@ -89,6 +91,57 @@ export const WorkerBundleTable = mysqlTable( (table) => [index("worker_bundle_worker_id").on(table.worker_id)], ) +export const CloudTaskTable = mysqlTable( + "cloud_task", + { + id: denTypeIdColumn("cloudTask", "id").notNull().primaryKey(), + org_id: denTypeIdColumn("org", "org_id").notNull(), + created_by_user_id: denTypeIdColumn("user", "created_by_user_id"), + created_by_member_id: denTypeIdColumn("member", "created_by_member_id"), + name: varchar("name", { length: 255 }).notNull(), + prompt: varchar("prompt", { length: 12000 }).notNull(), + schedule_type: mysqlEnum("schedule_type", CloudTaskScheduleType).notNull(), + schedule_time_of_day: varchar("schedule_time_of_day", { length: 5 }), + schedule_timezone: varchar("schedule_timezone", { length: 64 }), + model_provider_id: varchar("model_provider_id", { length: 255 }), + model_id: varchar("model_id", { length: 255 }), + agent: varchar("agent", { length: 255 }), + variant: varchar("variant", { length: 255 }), + enabled: boolean("enabled").notNull().default(true), + next_run_at: timestamp("next_run_at", { fsp: 3 }), + last_run_id: denTypeIdColumn("cloudTaskRun", "last_run_id"), + ...timestamps, + }, + (table) => [ + index("cloud_task_org_id").on(table.org_id), + index("cloud_task_created_by_user_id").on(table.created_by_user_id), + index("cloud_task_next_run_at").on(table.next_run_at), + ], +) + +export const CloudTaskRunTable = mysqlTable( + "cloud_task_run", + { + id: denTypeIdColumn("cloudTaskRun", "id").notNull().primaryKey(), + task_id: denTypeIdColumn("cloudTask", "task_id").notNull(), + org_id: denTypeIdColumn("org", "org_id").notNull(), + worker_id: denTypeIdColumn("worker", "worker_id"), + status: mysqlEnum("status", CloudTaskRunStatus).notNull(), + session_id: varchar("session_id", { length: 128 }), + openwork_url: varchar("openwork_url", { length: 2048 }), + error_message: varchar("error_message", { length: 2048 }), + started_at: timestamp("started_at", { fsp: 3 }), + completed_at: timestamp("completed_at", { fsp: 3 }), + ...timestamps, + }, + (table) => [ + index("cloud_task_run_task_id").on(table.task_id), + index("cloud_task_run_org_id").on(table.org_id), + index("cloud_task_run_worker_id").on(table.worker_id), + index("cloud_task_run_status").on(table.status), + ], +) + export const AuditEventTable = mysqlTable( "audit_event", { diff --git a/ee/packages/utils/src/typeid.ts b/ee/packages/utils/src/typeid.ts index 4c47845df1..d3b0583e53 100644 --- a/ee/packages/utils/src/typeid.ts +++ b/ee/packages/utils/src/typeid.ts @@ -73,6 +73,8 @@ export const idTypesMapNameToPrefix = { daytonaSandbox: "dts", workerToken: "wkt", workerBundle: "wkb", + cloudTask: "ctk", + cloudTaskRun: "ctr", auditEvent: "aev", telemetryEvent: "tev", } as const