Skip to content

Commit 375e2b0

Browse files
authored
Native Data Loader (#112)
* interpreted engine * Compiler * Enhance native batching: document feature, improve error handling, and update tests for partial failures
1 parent fc836e4 commit 375e2b0

10 files changed

Lines changed: 859 additions & 59 deletions

File tree

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
"@stackables/bridge": minor
3+
"@stackables/bridge-core": minor
4+
"@stackables/bridge-types": minor
5+
---
6+
7+
Improve native batched tool authoring by documenting the feature, exporting dedicated batch tool types, and simplifying the batch contract to plain input arrays.
8+
9+
Batch tools now receive `Input[]` and must return `Output[]` in matching order. Batched tool tracing and logging are also emitted once per flushed batch call instead of once per queued item.
10+
11+
Native batching now works in compiled execution as well as the runtime interpreter. Batch tools can also signal partial failures by returning an `Error` at a specific result index, which rejects only that item and allows normal wire-level `catch` fallbacks to handle it.

packages/bridge-compiler/src/codegen.ts

Lines changed: 211 additions & 50 deletions
Large diffs are not rendered by default.

packages/bridge-compiler/test/codegen.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1718,6 +1718,72 @@ bridge Query.test {
17181718
assert.deepStrictEqual(result.traces, []);
17191719
assert.equal(result.data.name, "Alice");
17201720
});
1721+
1722+
test("batched tools execute natively in compiled mode", async () => {
1723+
const document = parseBridgeFormat(`version 1.5
1724+
bridge Query.users {
1725+
with context as ctx
1726+
with output as o
1727+
1728+
o <- ctx.userIds[] as userId {
1729+
with app.fetchUser as user
1730+
1731+
user.id <- userId
1732+
.id <- userId
1733+
.name <- user.name
1734+
}
1735+
}`);
1736+
1737+
let batchCalls = 0;
1738+
const warnings: string[] = [];
1739+
const infos: Array<{ tool: string; fn: string; durationMs: number }> = [];
1740+
1741+
const fetchUser = Object.assign(
1742+
async (inputs: Array<{ id: string }>) => {
1743+
batchCalls++;
1744+
return inputs.map((input) => ({ name: `user:${input.id}` }));
1745+
},
1746+
{
1747+
bridge: {
1748+
batch: true,
1749+
log: { execution: "info" as const },
1750+
},
1751+
},
1752+
);
1753+
1754+
const result = await executeAot<any>({
1755+
document,
1756+
operation: "Query.users",
1757+
tools: {
1758+
app: { fetchUser },
1759+
},
1760+
context: {
1761+
userIds: ["u1", "u2", "u3"],
1762+
},
1763+
trace: "full",
1764+
logger: {
1765+
warn: (message: string) => warnings.push(message),
1766+
info: (meta: { tool: string; fn: string; durationMs: number }) => {
1767+
infos.push(meta);
1768+
},
1769+
},
1770+
});
1771+
1772+
assert.deepStrictEqual(result.data, [
1773+
{ id: "u1", name: "user:u1" },
1774+
{ id: "u2", name: "user:u2" },
1775+
{ id: "u3", name: "user:u3" },
1776+
]);
1777+
assert.equal(batchCalls, 1);
1778+
assert.deepStrictEqual(warnings, []);
1779+
assert.equal(result.traces.length, 1);
1780+
assert.deepStrictEqual(result.traces[0]!.input, [
1781+
{ id: "u1" },
1782+
{ id: "u2" },
1783+
{ id: "u3" },
1784+
]);
1785+
assert.equal(infos.length, 1);
1786+
});
17211787
});
17221788

17231789
// ── Parallel scheduling ──────────────────────────────────────────────────────

packages/bridge-core/src/ExecutionTree.ts

Lines changed: 219 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
trunkDependsOnElement,
66
} from "./scheduleTools.ts";
77
import { internal } from "./tools/index.ts";
8-
import type { ToolTrace } from "./tracing.ts";
8+
import type { EffectiveToolLog, ToolTrace } from "./tracing.ts";
99
import {
1010
isOtelActive,
1111
logToolError,
@@ -89,6 +89,20 @@ function stableMemoizeKey(value: unknown): string {
8989
.join(",")}}`;
9090
}
9191

92+
type PendingBatchToolCall = {
93+
input: Record<string, any>;
94+
resolve: (value: any) => void;
95+
reject: (err: unknown) => void;
96+
};
97+
98+
type BatchToolQueue = {
99+
items: PendingBatchToolCall[];
100+
scheduled: boolean;
101+
toolName: string;
102+
fnName: string;
103+
maxBatchSize?: number;
104+
};
105+
92106
export class ExecutionTree implements TreeContext {
93107
state: Record<string, any> = {};
94108
bridge: Bridge | undefined;
@@ -123,6 +137,9 @@ export class ExecutionTree implements TreeContext {
123137
/** Per-tool memoization caches keyed by stable input fingerprints. */
124138
private toolMemoCache: Map<string, Map<string, MaybePromise<any>>> =
125139
new Map();
140+
/** Per-request batch queues for tools declared with `.bridge.batch`. */
141+
private toolBatchQueues: Map<(...args: any[]) => any, BatchToolQueue> =
142+
new Map();
126143
/** Promise that resolves when all critical `force` handles have settled. */
127144
private forcedExecution?: Promise<void>;
128145
/** Shared trace collector — present only when tracing is enabled. */
@@ -305,7 +322,21 @@ export class ExecutionTree implements TreeContext {
305322
};
306323

307324
const timeoutMs = this.toolTimeoutMs;
308-
const { sync: isSyncTool, doTrace, log } = resolveToolMeta(fnImpl);
325+
const { sync: isSyncTool, batch, doTrace, log } = resolveToolMeta(fnImpl);
326+
327+
if (batch) {
328+
return this.callBatchedTool(
329+
toolName,
330+
fnName,
331+
fnImpl,
332+
input,
333+
timeoutMs,
334+
toolContext,
335+
doTrace,
336+
log,
337+
batch.maxBatchSize,
338+
);
339+
}
309340

310341
// ── Fast path: no instrumentation configured ──────────────────
311342
// When there is no internal tracer, no logger, and OpenTelemetry
@@ -480,6 +511,191 @@ export class ExecutionTree implements TreeContext {
480511
);
481512
}
482513

514+
private callBatchedTool(
515+
toolName: string,
516+
fnName: string,
517+
fnImpl: (...args: any[]) => any,
518+
input: Record<string, any>,
519+
timeoutMs: number,
520+
toolContext: ToolContext,
521+
doTrace: boolean,
522+
log: EffectiveToolLog,
523+
maxBatchSize?: number,
524+
): Promise<any> {
525+
let queue = this.toolBatchQueues.get(fnImpl);
526+
if (!queue) {
527+
queue = {
528+
items: [],
529+
scheduled: false,
530+
toolName,
531+
fnName,
532+
maxBatchSize,
533+
};
534+
this.toolBatchQueues.set(fnImpl, queue);
535+
}
536+
537+
if (maxBatchSize !== undefined) {
538+
queue.maxBatchSize = maxBatchSize;
539+
}
540+
541+
return new Promise((resolve, reject) => {
542+
queue!.items.push({ input, resolve, reject });
543+
if (queue!.scheduled) return;
544+
queue!.scheduled = true;
545+
queueMicrotask(() => {
546+
void this.flushBatchedToolQueue(
547+
fnImpl,
548+
toolContext,
549+
timeoutMs,
550+
doTrace,
551+
log,
552+
);
553+
});
554+
});
555+
}
556+
557+
private async flushBatchedToolQueue(
558+
fnImpl: (...args: any[]) => any,
559+
toolContext: ToolContext,
560+
timeoutMs: number,
561+
doTrace: boolean,
562+
log: EffectiveToolLog,
563+
): Promise<void> {
564+
const queue = this.toolBatchQueues.get(fnImpl);
565+
if (!queue) return;
566+
567+
const pending = queue.items.splice(0, queue.items.length);
568+
queue.scheduled = false;
569+
if (pending.length === 0) return;
570+
571+
if (this.signal?.aborted) {
572+
const abortErr = new BridgeAbortError();
573+
for (const item of pending) item.reject(abortErr);
574+
return;
575+
}
576+
577+
const chunkSize =
578+
queue.maxBatchSize && queue.maxBatchSize > 0
579+
? Math.floor(queue.maxBatchSize)
580+
: pending.length;
581+
582+
for (let start = 0; start < pending.length; start += chunkSize) {
583+
const chunk = pending.slice(start, start + chunkSize);
584+
const batchInput = chunk.map((item) => item.input);
585+
const tracer = this.tracer;
586+
const logger = this.logger;
587+
const metricAttrs = {
588+
"bridge.tool.name": queue.toolName,
589+
"bridge.tool.fn": queue.fnName,
590+
};
591+
592+
try {
593+
const executeBatch = async () => {
594+
const batchResult = fnImpl(batchInput, toolContext);
595+
return timeoutMs > 0 && isPromise(batchResult)
596+
? await raceTimeout(batchResult, timeoutMs, queue.toolName)
597+
: await batchResult;
598+
};
599+
600+
const resolved =
601+
!tracer && !logger && !isOtelActive()
602+
? await executeBatch()
603+
: await withSpan(
604+
doTrace,
605+
`bridge.tool.${queue.toolName}.${queue.fnName}`,
606+
metricAttrs,
607+
async (span) => {
608+
const traceStart = tracer?.now();
609+
const wallStart = performance.now();
610+
try {
611+
const result = await executeBatch();
612+
const durationMs = roundMs(performance.now() - wallStart);
613+
toolCallCounter.add(1, metricAttrs);
614+
toolDurationHistogram.record(durationMs, metricAttrs);
615+
if (tracer && traceStart != null) {
616+
tracer.record(
617+
tracer.entry({
618+
tool: queue.toolName,
619+
fn: queue.fnName,
620+
input: batchInput,
621+
output: result,
622+
durationMs: roundMs(tracer.now() - traceStart),
623+
startedAt: traceStart,
624+
}),
625+
);
626+
}
627+
logToolSuccess(
628+
logger,
629+
log.execution,
630+
queue.toolName,
631+
queue.fnName,
632+
durationMs,
633+
);
634+
return result;
635+
} catch (err) {
636+
const durationMs = roundMs(performance.now() - wallStart);
637+
toolCallCounter.add(1, metricAttrs);
638+
toolDurationHistogram.record(durationMs, metricAttrs);
639+
toolErrorCounter.add(1, metricAttrs);
640+
if (tracer && traceStart != null) {
641+
tracer.record(
642+
tracer.entry({
643+
tool: queue.toolName,
644+
fn: queue.fnName,
645+
input: batchInput,
646+
error: (err as Error).message,
647+
durationMs: roundMs(tracer.now() - traceStart),
648+
startedAt: traceStart,
649+
}),
650+
);
651+
}
652+
recordSpanError(span, err as Error);
653+
logToolError(
654+
logger,
655+
log.errors,
656+
queue.toolName,
657+
queue.fnName,
658+
err as Error,
659+
);
660+
if (
661+
this.signal?.aborted &&
662+
err instanceof DOMException &&
663+
err.name === "AbortError"
664+
) {
665+
throw new BridgeAbortError();
666+
}
667+
throw err;
668+
} finally {
669+
span?.end();
670+
}
671+
},
672+
);
673+
674+
if (!Array.isArray(resolved)) {
675+
throw new Error(
676+
`Batch tool "${queue.fnName}" must return an array of results`,
677+
);
678+
}
679+
if (resolved.length !== chunk.length) {
680+
throw new Error(
681+
`Batch tool "${queue.fnName}" returned ${resolved.length} results for ${chunk.length} queued calls`,
682+
);
683+
}
684+
685+
for (let i = 0; i < chunk.length; i++) {
686+
const value = resolved[i];
687+
if (value instanceof Error) {
688+
chunk[i]!.reject(value);
689+
} else {
690+
chunk[i]!.resolve(value);
691+
}
692+
}
693+
} catch (err) {
694+
for (const item of chunk) item.reject(err);
695+
}
696+
}
697+
}
698+
483699
shadow(): ExecutionTree {
484700
// Lightweight: bypass the constructor to avoid redundant work that
485701
// re-derives data identical to the parent (bridge lookup, pipeHandleMap,
@@ -505,6 +721,7 @@ export class ExecutionTree implements TreeContext {
505721
child.handleVersionMap = this.handleVersionMap;
506722
child.memoizedToolKeys = this.memoizedToolKeys;
507723
child.toolMemoCache = this.toolMemoCache;
724+
child.toolBatchQueues = this.toolBatchQueues;
508725
child.toolFns = this.toolFns;
509726
child.elementTrunkKey = this.elementTrunkKey;
510727
child.tracer = this.tracer;

packages/bridge-core/src/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ export { SELF_MODULE } from "./types.ts";
5858
export type {
5959
Bridge,
6060
BridgeDocument,
61+
BatchToolCallFn,
62+
BatchToolFn,
6163
CacheStore,
6264
ConstDef,
6365
ControlFlowInstruction,
@@ -66,6 +68,8 @@ export type {
6668
Instruction,
6769
NodeRef,
6870
SourceLocation,
71+
ScalarToolCallFn,
72+
ScalarToolFn,
6973
ToolCallFn,
7074
ToolContext,
7175
ToolDef,

packages/bridge-core/src/tracing.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import { metrics, trace } from "@opentelemetry/api";
99
import type { Span } from "@opentelemetry/api";
1010
import type { ToolMetadata } from "@stackables/bridge-types";
11+
import type { BatchToolMetadata } from "@stackables/bridge-types";
1112
import type { Logger } from "./tree-types.ts";
1213
import { roundMs } from "./tree-utils.ts";
1314

@@ -248,6 +249,8 @@ export type EffectiveToolLog = {
248249
export type ResolvedToolMeta = {
249250
/** Whether the tool declares synchronous execution. */
250251
sync: boolean;
252+
/** Batch mode contract, when declared. */
253+
batch?: BatchToolMetadata;
251254
/** Emit an OTel span for this call. Default: `true`. */
252255
doTrace: boolean;
253256
log: EffectiveToolLog;
@@ -271,6 +274,12 @@ export function resolveToolMeta(fn: (...args: any[]) => any): ResolvedToolMeta {
271274
const bridge = (fn as any).bridge as ToolMetadata | undefined;
272275
return {
273276
sync: bridge?.sync === true,
277+
batch:
278+
bridge?.batch === true
279+
? {}
280+
: typeof bridge?.batch === "object"
281+
? bridge.batch
282+
: undefined,
274283
doTrace: bridge?.trace !== false,
275284
log: resolveToolLog(bridge),
276285
};

0 commit comments

Comments
 (0)