diff --git a/src/commands/watch.ts b/src/commands/watch.ts index 44b71d3..963c479 100644 --- a/src/commands/watch.ts +++ b/src/commands/watch.ts @@ -3,6 +3,7 @@ import * as path from "node:path"; import { Command } from "commander"; import { createFileSystem, createStore } from "../lib/context"; import { DEFAULT_IGNORE_PATTERNS } from "../lib/file"; +import { acquireLock, isLocked, releaseLock } from "../lib/lock"; import { createIndexingSpinner, formatDryRunSummary, @@ -19,11 +20,41 @@ export async function startWatch(options: { dryRun: boolean; }): Promise { let refreshInterval: NodeJS.Timeout | undefined; + const watchRoot = process.cwd(); + + const skipSync = isLocked(watchRoot); + if (skipSync) { + console.log( + "Another watch process is already syncing this directory. Skipping initial sync.", + ); + } + + const hasLock = acquireLock(watchRoot); + if (!hasLock && !skipSync) { + console.log( + "Another watch process is already syncing this directory. Skipping initial sync.", + ); + } + + const cleanup = (): void => { + if (hasLock) { + releaseLock(watchRoot); + } + }; + + process.on("exit", cleanup); + process.on("SIGINT", () => { + cleanup(); + process.exit(0); + }); + process.on("SIGTERM", () => { + cleanup(); + process.exit(0); + }); try { const store = await createStore(); - // Refresh JWT token every 5 minutes (before 15-minute expiration) if (!options.dryRun) { const REFRESH_INTERVAL = 5 * 60 * 1000; refreshInterval = setInterval(async () => { @@ -36,73 +67,73 @@ export async function startWatch(options: { ); } }, REFRESH_INTERVAL); - // Allow process to exit even if interval is active (fs.watch keeps it alive anyway) refreshInterval.unref(); } const fileSystem = createFileSystem({ ignorePatterns: [...DEFAULT_IGNORE_PATTERNS], }); - const watchRoot = process.cwd(); console.debug("Watching for file changes in", watchRoot); - const { spinner, onProgress } = createIndexingSpinner(watchRoot); - try { + if (hasLock) { + const { spinner, onProgress } = createIndexingSpinner(watchRoot); try { - await store.retrieve(options.store); - } catch { - await store.create({ - name: options.store, - description: - "mgrep store - Mixedbreads multimodal multilingual magic search", - }); - } - const result = await initialSync( - store, - fileSystem, - options.store, - watchRoot, - options.dryRun, - onProgress, - ); - const deletedInfo = - result.deleted > 0 ? ` • deleted ${result.deleted}` : ""; - const errorsInfo = - result.errors > 0 ? ` • errors ${result.errors}` : ""; - if (result.errors > 0) { - spinner.warn( - `Initial sync complete (${result.processed}/${result.total}) • uploaded ${result.uploaded}${deletedInfo}${errorsInfo}`, - ); - console.error( - `\n⚠️ ${result.errors} file(s) failed to upload. Run with DEBUG=mgrep* for more details.`, - ); - } else { - spinner.succeed( - `Initial sync complete (${result.processed}/${result.total}) • uploaded ${result.uploaded}${deletedInfo}`, - ); - } - if (options.dryRun) { - console.log( - formatDryRunSummary(result, { - actionDescription: "found", - includeTotal: true, - }), - ); - return; - } - } catch (e) { - if (e instanceof QuotaExceededError) { - spinner.fail("Quota exceeded"); - console.error( - "\n❌ Free tier quota exceeded. You've reached the monthly limit of 2,000,000 store tokens.", - ); - console.error( - " Upgrade your plan at https://platform.mixedbread.com to continue syncing.\n", + try { + await store.retrieve(options.store); + } catch { + await store.create({ + name: options.store, + description: + "mgrep store - Mixedbreads multimodal multilingual magic search", + }); + } + const result = await initialSync( + store, + fileSystem, + options.store, + watchRoot, + options.dryRun, + onProgress, ); - process.exit(1); + const deletedInfo = + result.deleted > 0 ? ` • deleted ${result.deleted}` : ""; + const errorsInfo = + result.errors > 0 ? ` • errors ${result.errors}` : ""; + if (result.errors > 0) { + spinner.warn( + `Initial sync complete (${result.processed}/${result.total}) • uploaded ${result.uploaded}${deletedInfo}${errorsInfo}`, + ); + console.error( + `\n⚠️ ${result.errors} file(s) failed to upload. Run with DEBUG=mgrep* for more details.`, + ); + } else { + spinner.succeed( + `Initial sync complete (${result.processed}/${result.total}) • uploaded ${result.uploaded}${deletedInfo}`, + ); + } + if (options.dryRun) { + console.log( + formatDryRunSummary(result, { + actionDescription: "found", + includeTotal: true, + }), + ); + return; + } + } catch (e) { + if (e instanceof QuotaExceededError) { + spinner.fail("Quota exceeded"); + console.error( + "\n❌ Free tier quota exceeded. You've reached the monthly limit of 2,000,000 store tokens.", + ); + console.error( + " Upgrade your plan at https://platform.mixedbread.com to continue syncing.\n", + ); + process.exit(1); + } + spinner.fail("Initial upload failed"); + throw e; } - spinner.fail("Initial upload failed"); - throw e; } console.log("Watching for file changes in", watchRoot); @@ -141,6 +172,7 @@ export async function startWatch(options: { if (refreshInterval) { clearInterval(refreshInterval); } + cleanup(); const message = error instanceof Error ? error.message : "Unknown error"; console.error("Failed to start watcher:", message); process.exitCode = 1; diff --git a/src/lib/lock.ts b/src/lib/lock.ts new file mode 100644 index 0000000..e3b758f --- /dev/null +++ b/src/lib/lock.ts @@ -0,0 +1,98 @@ +import * as fs from "node:fs"; +import * as path from "node:path"; + +const LOCK_DIR = "/tmp"; +const LOCK_PREFIX = "mgrep-watch-lock-"; + +/** + * Generates a lock file path based on the directory being watched. + * Uses a hash of the directory path to create a unique lock file name. + */ +function getLockFilePath(watchDir: string): string { + const normalizedPath = path.resolve(watchDir); + const hash = Buffer.from(normalizedPath).toString("base64url"); + return path.join(LOCK_DIR, `${LOCK_PREFIX}${hash}.lock`); +} + +/** + * Attempts to acquire a lock for the given directory. + * Returns true if the lock was acquired, false if another process holds it. + */ +export function acquireLock(watchDir: string): boolean { + const lockFile = getLockFilePath(watchDir); + + try { + if (fs.existsSync(lockFile)) { + const content = fs.readFileSync(lockFile, "utf-8"); + const pid = Number.parseInt(content.trim(), 10); + + if (!Number.isNaN(pid)) { + try { + process.kill(pid, 0); + return false; + } catch { + fs.unlinkSync(lockFile); + } + } + } + + fs.writeFileSync(lockFile, process.pid.toString(), { flag: "wx" }); + return true; + } catch (err) { + if (err instanceof Error && "code" in err && err.code === "EEXIST") { + return false; + } + throw err; + } +} + +/** + * Releases the lock for the given directory. + * Only removes the lock file if this process owns it. + */ +export function releaseLock(watchDir: string): void { + const lockFile = getLockFilePath(watchDir); + + try { + if (fs.existsSync(lockFile)) { + const content = fs.readFileSync(lockFile, "utf-8"); + const pid = Number.parseInt(content.trim(), 10); + + if (pid === process.pid) { + fs.unlinkSync(lockFile); + } + } + } catch { + // Ignore errors during cleanup + } +} + +/** + * Checks if a lock exists for the given directory. + * Returns true if the lock is held by a running process. + */ +export function isLocked(watchDir: string): boolean { + const lockFile = getLockFilePath(watchDir); + + try { + if (!fs.existsSync(lockFile)) { + return false; + } + + const content = fs.readFileSync(lockFile, "utf-8"); + const pid = Number.parseInt(content.trim(), 10); + + if (Number.isNaN(pid)) { + return false; + } + + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } + } catch { + return false; + } +}