Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {
commit,
url,
rm,
rmTracked,
resync,
ls,
config,
watch,
Expand Down Expand Up @@ -248,6 +250,39 @@ program
await rm(path);
});

// Rm-tracked command — remove a specific path from the snapshot
program
.command("rm-tracked")
.summary("Give up tracking a chronically unavailable path")
.description(
"Remove a file from the pushwork snapshot (and by default from the local filesystem). " +
"Use this when a tracked file has been unavailable on the remote for so many syncs " +
"that you want pushwork to stop trying to reconcile it."
)
.argument("<file>", "Relative path of the tracked file")
.argument("[path]", "Directory path (default: current directory)", ".")
.option("--keep-local", "Keep the local file; only remove it from the snapshot", false)
.action(async (file, pathArg, opts) => {
await rmTracked(pathArg, file, {
keepLocal: opts.keepLocal,
});
});

// Resync command — re-push a tracked file as a fresh document
program
.command("resync")
.summary("Re-push a tracked file as a fresh Automerge document")
.description(
"Create a new Automerge document from the current local content and update " +
"the parent directory entry to point at it. Useful when the existing document " +
"has become chronically unavailable or corrupted on the remote."
)
.argument("<file>", "Relative path of the tracked file")
.argument("[path]", "Directory path (default: current directory)", ".")
.action(async (file, pathArg) => {
await resync(pathArg, file);
});

// List command
program
.command("ls")
Expand Down
153 changes: 133 additions & 20 deletions src/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,15 @@ async function initializeRepository(
}

// Create repository and sync engine
const repo = await createRepo(resolvedPath, config, sub);
const syncEngine = new SyncEngine(repo, resolvedPath, config);
const { repo, requiresRehydrate, recoveryReason } = await createRepo(
resolvedPath,
config,
sub
);
const syncEngine = new SyncEngine(repo, resolvedPath, config, {
requiresRehydrate,
recoveryReason,
});

return { config, repo, syncEngine };
}
Expand Down Expand Up @@ -122,10 +129,17 @@ async function setupCommandContext(
}

// Create repo with config
const repo = await createRepo(resolvedPath, config, sub);
const { repo, requiresRehydrate, recoveryReason } = await createRepo(
resolvedPath,
config,
sub
);

// Create sync engine
const syncEngine = new SyncEngine(repo, resolvedPath, config);
const syncEngine = new SyncEngine(repo, resolvedPath, config, {
requiresRehydrate,
recoveryReason,
});

return {
repo,
Expand Down Expand Up @@ -531,23 +545,52 @@ export async function status(

// Show verbose details if requested
if (options.verbose && syncStatus.snapshot?.rootDirectoryUrl) {
const rootHandle = await repo.find<DirectoryDocument>(
syncStatus.snapshot.rootDirectoryUrl
);
const rootDoc = await rootHandle.doc();

if (rootDoc) {
out.infoBlock("HEADS");
out.arr(rootHandle.heads());

if (syncStatus.snapshot && syncStatus.snapshot.files.size > 0) {
out.infoBlock("TRACKED FILES");
const filesObj: Record<string, string> = {};
syncStatus.snapshot.files.forEach((entry, filePath) => {
filesObj[filePath] = entry.url;
});
out.obj(filesObj);
try {
const rootHandle = await repo.find<DirectoryDocument>(
syncStatus.snapshot.rootDirectoryUrl
);
const rootDoc = await rootHandle.doc();

if (rootDoc) {
out.infoBlock("HEADS");
out.arr(rootHandle.heads());

if (syncStatus.snapshot && syncStatus.snapshot.files.size > 0) {
out.infoBlock("TRACKED FILES");
const filesObj: Record<string, string> = {};
syncStatus.snapshot.files.forEach((entry, filePath) => {
filesObj[filePath] = entry.url;
});
out.obj(filesObj);
}
}
} catch (error) {
out.warn(`Warning: Could not load root document details: ${error}`);
}
}

// Chronic-unavailable section: paths whose consecutive-unavailable
// counter is non-zero. These are files pushwork has repeatedly
// declined to reconcile because their remote state could not be
// confirmed. Always shown in verbose mode, independent of root-doc
// availability, because the counter is a pure snapshot property.
if (options.verbose && syncStatus.snapshot) {
const chronic: Array<{ path: string; count: number }> = [];
syncStatus.snapshot.files.forEach((entry, filePath) => {
const n = entry.consecutiveUnavailableCount ?? 0;
if (n > 0) chronic.push({ path: filePath, count: n });
});
if (chronic.length > 0) {
chronic.sort((a, b) => b.count - a.count);
out.infoBlock("CHRONICALLY UNAVAILABLE");
const obj: Record<string, string> = {};
for (const { path: p, count } of chronic) {
obj[p] = `${count} consecutive sync(s) unavailable`;
}
out.obj(obj);
out.info(
"Resolve with: pushwork rm-tracked <path> or pushwork resync <path>"
);
}
}

Expand Down Expand Up @@ -749,6 +792,76 @@ export async function rm(targetPath: string = "."): Promise<void> {
process.exit();
}

/**
* Remove a tracked path from the snapshot. Used to give up on a file
* that has been chronically unavailable on the remote.
*/
export async function rmTracked(
targetPath: string,
relativePath: string,
options: { keepLocal?: boolean } = {}
): Promise<void> {
const { syncEngine, repo } = await setupCommandContext(targetPath, {
syncEnabled: false,
});

const deleteLocal = !options.keepLocal;
out.task(
`Removing ${relativePath} from snapshot${deleteLocal ? " and local disk" : ""}`
);
try {
const removed = await syncEngine.removeTrackedPath(relativePath, {
deleteLocal,
});
if (!removed) {
out.done();
out.warn(`Path not tracked: ${relativePath}`);
await safeRepoShutdown(repo);
out.exit(1);
return;
}
out.done();
out.success(`Removed ${relativePath}`);
} catch (error) {
out.crash(error);
} finally {
await safeRepoShutdown(repo);
}
}

/**
* Force-recreate the remote document for a tracked path from current
* local content. Used to recover a file whose Automerge document has
* become chronically unavailable — mints a fresh doc and updates the
* parent directory entry to point at it.
*/
export async function resync(
targetPath: string,
relativePath: string
): Promise<void> {
const { syncEngine, repo } = await setupCommandContext(targetPath);

out.task(`Re-syncing ${relativePath}`);
try {
const ok = await syncEngine.resyncPath(relativePath);
if (!ok) {
out.done();
out.warn(
`Could not re-sync ${relativePath} (not tracked or local file missing)`
);
await safeRepoShutdown(repo);
out.exit(1);
return;
}
out.done();
out.success(`Re-synced ${relativePath} to a fresh document`);
} catch (error) {
out.crash(error);
} finally {
await safeRepoShutdown(repo);
}
}

export async function commit(
targetPath: string,
_options: CommandOptions = {}
Expand Down
81 changes: 75 additions & 6 deletions src/core/change-detection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
joinAndNormalizePath,
getPlainUrl,
readDocContent,
RemoteLookup,
} from "../utils"
import {isContentEqual, contentHash} from "../utils/content"
import {out} from "../utils/output"
Expand All @@ -34,13 +35,47 @@ function debug(...args: any[]) {
* Change detection engine
*/
export class ChangeDetector {
/**
* Paths (snapshot-tracked files) for which `detectRemoteChanges`
* observed a `RemoteLookup` of kind `unavailable` during the most
* recent `detectChanges` call. These paths were deliberately not
* emitted as deletions (Phase 1/2) because we cannot confirm their
* absence on the remote. Callers may use this to maintain a
* consecutive-unavailable counter per path — see `SnapshotFileEntry`.
*/
private _lastSkippedUnavailablePaths: Set<string> = new Set()

/**
* Paths for which `detectRemoteChanges` observed a successful lookup
* (kind `found` or `absent`) during the most recent `detectChanges`
* call. Callers should reset the per-path counter for these paths.
*/
private _lastConfirmedPaths: Set<string> = new Set()

constructor(
private repo: Repo,
private rootPath: string,
private excludePatterns: string[] = [],
private artifactDirectories: string[] = []
) {}

/**
* Paths whose remote state could not be determined during the last
* `detectChanges` call. See `SnapshotFileEntry.consecutiveUnavailableCount`.
*/
getLastSkippedUnavailablePaths(): ReadonlySet<string> {
return this._lastSkippedUnavailablePaths
}

/**
* Paths whose remote state was confirmed (present or absent) during
* the last `detectChanges` call. The per-path unavailable counter
* should be reset to 0 for these paths.
*/
getLastConfirmedPaths(): ReadonlySet<string> {
return this._lastConfirmedPaths
}

/**
* Check if a file path is inside an artifact directory.
* Artifact files use RawString and are always replaced wholesale,
Expand All @@ -57,6 +92,8 @@ export class ChangeDetector {
*/
async detectChanges(snapshot: SyncSnapshot, excludePaths?: Set<string>): Promise<DetectedChange[]> {
const changes: DetectedChange[] = []
this._lastSkippedUnavailablePaths = new Set()
this._lastConfirmedPaths = new Set()

// Get current filesystem state
const currentFiles = await this.getCurrentFilesystemState()
Expand Down Expand Up @@ -247,13 +284,32 @@ export class ChangeDetector {
Array.from(snapshot.files.entries()).map(
async ([relativePath, snapshotEntry]) => {
// Find the file's current entry in the remote directory hierarchy
const remoteEntry = await this.findInRemoteDirectory(
const lookup = await this.findInRemoteDirectory(
snapshot.rootDirectoryUrl,
relativePath
)

if (!remoteEntry) {
// File was removed from remote directory listing
if (lookup.kind === "unavailable") {
// We could not read the authoritative directory for
// this path. Do NOT emit a change: we have no basis
// to conclude anything about the remote state. The
// next sync will try again. Record the skip so the
// caller can maintain a consecutive-unavailable
// counter — chronic unavailability requires user
// intervention.
this._lastSkippedUnavailablePaths.add(relativePath)
debug(
`detectRemoteChanges: skipping ${relativePath} — directory unavailable (${lookup.reason})`
)
return
}

// Any kind of successful lookup (found or absent) counts
// as confirmed for the chronic-unavailability tracker.
this._lastConfirmedPaths.add(relativePath)

if (lookup.kind === "absent") {
// File was confirmed absent from the remote directory listing
const localContent = await this.getLocalContent(relativePath)

// Only report as deleted if local file still exists
Expand All @@ -267,11 +323,15 @@ export class ChangeDetector {
remoteContent: null, // File deleted remotely
localHead: snapshotEntry.head,
remoteHead: snapshotEntry.head,
confirmedAbsent: true,
})
}
return
}

// lookup.kind === "found"
const remoteEntry = lookup.entry

// Check if the document was replaced entirely (new URL).
// This happens when a peer replaces an artifact file, fixes a
// legacy immutable string, or recreates a failed document.
Expand Down Expand Up @@ -696,13 +756,22 @@ export class ChangeDetector {

/**
* Find a file's entry in the remote directory hierarchy.
* Returns the entry (with name, type, url) or null if not found.
*
* Returns a tri-state `RemoteLookup` that distinguishes:
* - `found`: the directory doc was read and the file entry exists
* - `absent`: the directory doc was read and the file entry is absent
* - `unavailable`: the directory doc could not be read (transient)
*
* Callers that perform destructive operations MUST check for `absent`
* specifically, not just "not found".
*/
private async findInRemoteDirectory(
rootDirectoryUrl: AutomergeUrl | undefined,
filePath: string
): Promise<{ name: string; type: string; url: AutomergeUrl } | null> {
if (!rootDirectoryUrl) return null
): Promise<RemoteLookup> {
if (!rootDirectoryUrl) {
return { kind: "unavailable", reason: "no root directory URL" }
}
return findFileInDirectoryHierarchy(
this.repo,
rootDirectoryUrl,
Expand Down
Loading