Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 258 additions & 7 deletions apps/server/src/git/Layers/GitCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,27 @@ import {
Layer,
Option,
Path,
PlatformError,
Ref,
Result,
Schema,
Scope,
Semaphore,
Stream,
} from "effect";
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";

import { GitCommandError } from "../Errors.ts";
import {
GitCore,
type ExecuteGitProgress,
type GitCommitOptions,
type GitCoreShape,
type ExecuteGitInput,
type ExecuteGitResult,
} from "../Services/GitCore.ts";
import { ServerConfig } from "../../config.ts";
import { decodeJsonResult } from "@t3tools/shared/schemaJson";

const DEFAULT_TIMEOUT_MS = 30_000;
const DEFAULT_MAX_OUTPUT_BYTES = 1_000_000;
Expand All @@ -29,6 +37,11 @@ const STATUS_UPSTREAM_REFRESH_TIMEOUT = Duration.seconds(5);
const STATUS_UPSTREAM_REFRESH_CACHE_CAPACITY = 2_048;
const DEFAULT_BASE_BRANCH_CANDIDATES = ["main", "master"] as const;

type TraceTailState = {
processedChars: number;
remainder: string;
};

class StatusUpstreamRefreshCacheKey extends Data.Class<{
cwd: string;
upstreamRef: string;
Expand All @@ -40,6 +53,7 @@ interface ExecuteGitOptions {
timeoutMs?: number | undefined;
allowNonZeroExit?: boolean | undefined;
fallbackErrorMessage?: string | undefined;
progress?: ExecuteGitProgress | undefined;
}

function parseBranchAb(value: string): { ahead: number; behind: number } {
Expand Down Expand Up @@ -257,14 +271,201 @@ function toGitCommandError(
});
}

interface Trace2Monitor {
readonly env: NodeJS.ProcessEnv;
readonly flush: Effect.Effect<void, never>;
}

function trace2ChildKey(record: Record<string, unknown>): string | null {
const childId = record.child_id;
if (typeof childId === "number" || typeof childId === "string") {
return String(childId);
}
const hookName = record.hook_name;
return typeof hookName === "string" && hookName.trim().length > 0 ? hookName.trim() : null;
}

const Trace2Record = Schema.Record(Schema.String, Schema.Unknown);

const createTrace2Monitor = Effect.fn(function* (
input: Pick<ExecuteGitInput, "operation" | "cwd" | "args">,
progress: ExecuteGitProgress | undefined,
): Effect.fn.Return<
Trace2Monitor,
PlatformError.PlatformError,
Scope.Scope | FileSystem.FileSystem | Path.Path
> {
if (!progress?.onHookStarted && !progress?.onHookFinished) {
return {
env: {},
flush: Effect.void,
};
}

const fs = yield* FileSystem.FileSystem;
const path = yield* Path.Path;
const traceFilePath = yield* fs.makeTempFileScoped({
prefix: `t3code-git-trace2-${process.pid}-`,
suffix: ".json",
});
const hookStartByChildKey = new Map<string, { hookName: string; startedAtMs: number }>();
const traceTailState = yield* Ref.make<TraceTailState>({
processedChars: 0,
remainder: "",
});

const handleTraceLine = (line: string) =>
Effect.gen(function* () {
const trimmedLine = line.trim();
if (trimmedLine.length === 0) {
return;
}

const traceRecord = decodeJsonResult(Trace2Record)(trimmedLine);
if (Result.isFailure(traceRecord)) {
yield* Effect.logDebug(
`GitCore.trace2: failed to parse trace line for ${quoteGitCommand(input.args)} in ${input.cwd}`,
traceRecord.failure,
);
return;
}

if (traceRecord.success.child_class !== "hook") {
return;
}

const event = traceRecord.success.event;
const childKey = trace2ChildKey(traceRecord.success);
if (childKey === null) {
return;
}
const started = hookStartByChildKey.get(childKey);
const hookNameFromEvent =
typeof traceRecord.success.hook_name === "string"
? traceRecord.success.hook_name.trim()
: "";
const hookName = hookNameFromEvent.length > 0 ? hookNameFromEvent : (started?.hookName ?? "");
if (hookName.length === 0) {
return;
}

if (event === "child_start") {
hookStartByChildKey.set(childKey, { hookName, startedAtMs: Date.now() });
if (progress.onHookStarted) {
yield* progress.onHookStarted(hookName);
}
return;
}

if (event === "child_exit") {
hookStartByChildKey.delete(childKey);
if (progress.onHookFinished) {
const code = traceRecord.success.code;
yield* progress.onHookFinished({
hookName: started?.hookName ?? hookName,
exitCode: typeof code === "number" && Number.isInteger(code) ? code : null,
durationMs: started ? Math.max(0, Date.now() - started.startedAtMs) : null,
});
}
}
});

const deltaMutex = yield* Semaphore.make(1);
const readTraceDelta = deltaMutex.withPermit(
fs.readFileString(traceFilePath).pipe(
Effect.flatMap((contents) =>
Effect.uninterruptible(
Ref.modify(traceTailState, ({ processedChars, remainder }) => {
if (contents.length <= processedChars) {
return [[], { processedChars, remainder }];
}

const appended = contents.slice(processedChars);
const combined = remainder + appended;
const lines = combined.split("\n");
const nextRemainder = lines.pop() ?? "";

return [
lines.map((line) => line.replace(/\r$/, "")),
{
processedChars: contents.length,
remainder: nextRemainder,
},
];
}).pipe(
Effect.flatMap((lines) => Effect.forEach(lines, handleTraceLine, { discard: true })),
),
),
),
Effect.ignore({ log: true }),
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trace file fully re-read on every watch event

Low Severity

readTraceDelta calls fs.readFileString(traceFilePath) on every file-system watch event, re-reading the entire file from the start, then slicing from processedChars. As the trace file grows during a long-running commit with multiple hooks, the cumulative bytes read grows quadratically in the file size. Using a byte-offset–based read (or keeping a file handle open for incremental reads) would avoid re-reading already-processed content.

Fix in Cursor Fix in Web

);
const traceFileName = path.basename(traceFilePath);
yield* Stream.runForEach(fs.watch(traceFilePath), (event) => {
const eventPath = event.path;
const isTargetTraceEvent =
eventPath === traceFilePath ||
eventPath === traceFileName ||
path.basename(eventPath) === traceFileName;
if (!isTargetTraceEvent) return Effect.void;
return readTraceDelta;
}).pipe(Effect.ignoreCause({ log: true }), Effect.forkScoped);

yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
yield* readTraceDelta;
const finalLine = yield* Ref.modify(traceTailState, ({ processedChars, remainder }) => [
remainder.trim(),
{
processedChars,
remainder: "",
},
]);
if (finalLine.length > 0) {
yield* handleTraceLine(finalLine);
}
}),
);

return {
env: {
GIT_TRACE2_EVENT: traceFilePath,
},
flush: readTraceDelta,
};
});

const collectOutput = Effect.fn(function* <E>(
input: Pick<ExecuteGitInput, "operation" | "cwd" | "args">,
stream: Stream.Stream<Uint8Array, E>,
maxOutputBytes: number,
onLine: ((line: string) => Effect.Effect<void, never>) | undefined,
): Effect.fn.Return<string, GitCommandError> {
const decoder = new TextDecoder();
let bytes = 0;
let text = "";
let lineBuffer = "";

const emitCompleteLines = (flush: boolean) =>
Effect.gen(function* () {
let newlineIndex = lineBuffer.indexOf("\n");
while (newlineIndex >= 0) {
const line = lineBuffer.slice(0, newlineIndex).replace(/\r$/, "");
lineBuffer = lineBuffer.slice(newlineIndex + 1);
if (line.length > 0 && onLine) {
yield* onLine(line);
}
newlineIndex = lineBuffer.indexOf("\n");
}

if (flush) {
const trailing = lineBuffer.replace(/\r$/, "");
lineBuffer = "";
if (trailing.length > 0 && onLine) {
yield* onLine(trailing);
}
}
});

yield* Stream.runForEach(stream, (chunk) =>
Effect.gen(function* () {
Expand All @@ -277,11 +478,17 @@ const collectOutput = Effect.fn(function* <E>(
detail: `${quoteGitCommand(input.args)} output exceeded ${maxOutputBytes} bytes and was truncated.`,
});
}
text += decoder.decode(chunk, { stream: true });
const decoded = decoder.decode(chunk, { stream: true });
text += decoded;
lineBuffer += decoded;
yield* emitCompleteLines(false);
}),
).pipe(Effect.mapError(toGitCommandError(input, "output stream failed.")));

text += decoder.decode();
const remainder = decoder.decode();
text += remainder;
lineBuffer += remainder;
yield* emitCompleteLines(true);
return text;
});

Expand All @@ -306,26 +513,46 @@ export const makeGitCore = (options?: { executeOverride?: GitCoreShape["execute"
const maxOutputBytes = input.maxOutputBytes ?? DEFAULT_MAX_OUTPUT_BYTES;

const commandEffect = Effect.gen(function* () {
const trace2Monitor = yield* createTrace2Monitor(commandInput, input.progress).pipe(
Effect.provideService(Path.Path, path),
Effect.provideService(FileSystem.FileSystem, fileSystem),
Effect.mapError(toGitCommandError(commandInput, "failed to create trace2 monitor.")),
);
const child = yield* commandSpawner
.spawn(
ChildProcess.make("git", commandInput.args, {
cwd: commandInput.cwd,
...(input.env ? { env: input.env } : {}),
env: {
...process.env,
...input.env,
...trace2Monitor.env,
},
}),
)
.pipe(Effect.mapError(toGitCommandError(commandInput, "failed to spawn.")));

const [stdout, stderr, exitCode] = yield* Effect.all(
[
collectOutput(commandInput, child.stdout, maxOutputBytes),
collectOutput(commandInput, child.stderr, maxOutputBytes),
collectOutput(
commandInput,
child.stdout,
maxOutputBytes,
input.progress?.onStdoutLine,
),
collectOutput(
commandInput,
child.stderr,
maxOutputBytes,
input.progress?.onStderrLine,
),
child.exitCode.pipe(
Effect.map((value) => Number(value)),
Effect.mapError(toGitCommandError(commandInput, "failed to report exit code.")),
),
],
{ concurrency: "unbounded" },
);
yield* trace2Monitor.flush;

if (!input.allowNonZeroExit && exitCode !== 0) {
const trimmedStderr = stderr.trim();
Expand Down Expand Up @@ -376,6 +603,7 @@ export const makeGitCore = (options?: { executeOverride?: GitCoreShape["execute"
args,
allowNonZeroExit: true,
...(options.timeoutMs !== undefined ? { timeoutMs: options.timeoutMs } : {}),
...(options.progress ? { progress: options.progress } : {}),
}).pipe(
Effect.flatMap((result) => {
if (options.allowNonZeroExit || result.code === 0) {
Expand Down Expand Up @@ -947,14 +1175,37 @@ export const makeGitCore = (options?: { executeOverride?: GitCoreShape["execute"
};
});

const commit: GitCoreShape["commit"] = (cwd, subject, body) =>
const commit: GitCoreShape["commit"] = (cwd, subject, body, options?: GitCommitOptions) =>
Effect.gen(function* () {
const args = ["commit", "-m", subject];
const trimmedBody = body.trim();
if (trimmedBody.length > 0) {
args.push("-m", trimmedBody);
}
yield* runGit("GitCore.commit.commit", cwd, args);
const progress = options?.progress
? {
...(options.progress.onOutputLine
? {
onStdoutLine: (line: string) =>
options.progress?.onOutputLine?.({ stream: "stdout", text: line }) ??
Effect.void,
onStderrLine: (line: string) =>
options.progress?.onOutputLine?.({ stream: "stderr", text: line }) ??
Effect.void,
}
: {}),
...(options.progress.onHookStarted
? { onHookStarted: options.progress.onHookStarted }
: {}),
...(options.progress.onHookFinished
? { onHookFinished: options.progress.onHookFinished }
: {}),
}
: null;
yield* executeGit("GitCore.commit.commit", cwd, args, {
...(options?.timeoutMs !== undefined ? { timeoutMs: options.timeoutMs } : {}),
...(progress ? { progress } : {}),
}).pipe(Effect.asVoid);
const commitSha = yield* runGitStdout("GitCore.commit.revParseHead", cwd, [
"rev-parse",
"HEAD",
Expand Down
Loading