diff --git a/src/cli/install/download.zig b/src/cli/install/download.zig index 66ecd03..131ffd7 100644 --- a/src/cli/install/download.zig +++ b/src/cli/install/download.zig @@ -78,6 +78,17 @@ pub fn progressBridge(ctx: *anyopaque, bytes_so_far: u64, content_length: ?u64) bar.update(clamped); } +/// Sleeps for `ms` between retries. Returns false when the caller's +/// stop signal cancels the sleep, true otherwise. std.Io cancellation +/// is single-shot per task, so a swallowed Canceled would silently +/// consume the request and the loop would keep retrying. +fn cancellableBackoff(io: std.Io, ms: u64) bool { + std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(ms * std.time.ns_per_ms)), .awake) catch |e| switch (e) { + error.Canceled => return false, + }; + return true; +} + /// Download a bottle and commit to store. Runs in a worker thread. /// `http_pool` is shared across all download workers — each worker /// borrows a client for the duration of a single blob download and @@ -152,7 +163,7 @@ pub fn downloadWorker( break; } if (dl_attempt + 1 < max_attempts) { - std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[dl_attempt] * std.time.ns_per_ms)), .awake) catch {}; + if (!cancellableBackoff(io, retry_delays_ms[dl_attempt])) break; } } } @@ -692,3 +703,55 @@ test "MaterializeResult.kegPath reflects keg_path_len after write" { r.keg_path_len = path.len; try std.testing.expectEqualStrings(path, r.kegPath()); } + +// Patches an inner Io's vtable so `sleep` reports cancellation on the +// configured call index; non-canceled sleeps return immediately so the +// retry-table delays don't pad the test runtime. +const CancelSleepProbe = struct { + var vtable: std.Io.VTable = undefined; + var sleep_calls: usize = 0; + var cancel_at: usize = 1; + + fn wrap(inner: std.Io, cancel_at_call: usize) std.Io { + vtable = inner.vtable.*; + vtable.sleep = sleepMaybeCanceled; + sleep_calls = 0; + cancel_at = cancel_at_call; + return .{ .userdata = inner.userdata, .vtable = &vtable }; + } + + fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void { + sleep_calls += 1; + if (sleep_calls >= cancel_at) return error.Canceled; + } +}; + +test "cancellableBackoff returns true when sleep completes normally" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + // Zero-ms request keeps the test deterministic without exercising the + // real clock; a successful sleep must report true so the retry loop + // moves on to the next attempt. + try std.testing.expect(cancellableBackoff(threaded.io(), 0)); +} + +test "cancellableBackoff returns false when sleep is cancelled" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + const cancel_io = CancelSleepProbe.wrap(threaded.io(), 1); + try std.testing.expect(!cancellableBackoff(cancel_io, 100)); + try std.testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls); +} + +test "cancellableBackoff propagates cancellation when called repeatedly" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + // Cancel the third call: the prior two complete and report true, + // pinning that the helper isn't inadvertently latched after the + // first non-cancelled sleep. + const cancel_io = CancelSleepProbe.wrap(threaded.io(), 3); + try std.testing.expect(cancellableBackoff(cancel_io, 0)); + try std.testing.expect(cancellableBackoff(cancel_io, 0)); + try std.testing.expect(!cancellableBackoff(cancel_io, 0)); + try std.testing.expectEqual(@as(usize, 3), CancelSleepProbe.sleep_calls); +} diff --git a/src/core/services/supervisor.zig b/src/core/services/supervisor.zig index d536963..98a13f2 100644 --- a/src/core/services/supervisor.zig +++ b/src/core/services/supervisor.zig @@ -407,7 +407,12 @@ pub fn followLog( while (!interrupted()) { const n = f.readPositionalAll(io, &buf, offset) catch return SupervisorError.IoFailed; if (n == 0) { - std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(poll_ns)), .awake) catch {}; + // Cancelled sleep means SIGINT (or another stop signal) reached + // the io subsystem before `interrupted()` flipped; bail rather + // than busy-spin until the flag catches up. + std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(poll_ns)), .awake) catch |e| switch (e) { + error.Canceled => break, + }; continue; } writer.writeAll(buf[0..n]) catch return SupervisorError.IoFailed; @@ -518,3 +523,126 @@ test "followLog flushes appended bytes between polls" { try testing.expect(std.mem.indexOf(u8, aw.written(), "appended\n") != null); try testing.expectEqual(@as(usize, 3), FollowProbe.calls); } + +// Patches an inner Io's vtable so `sleep` reports cancellation on the +// configured call index; non-canceled sleeps return immediately so the +// 200 ms poll cadence doesn't pad the test runtime. +const CancelSleepProbe = struct { + var vtable: std.Io.VTable = undefined; + var sleep_calls: usize = 0; + var cancel_at: usize = 1; + + fn wrap(inner: std.Io, cancel_at_call: usize) std.Io { + vtable = inner.vtable.*; + vtable.sleep = sleepMaybeCanceled; + sleep_calls = 0; + cancel_at = cancel_at_call; + return .{ .userdata = inner.userdata, .vtable = &vtable }; + } + + fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void { + sleep_calls += 1; + if (sleep_calls >= cancel_at) return error.Canceled; + } +}; + +test "followLog breaks when sleep reports cancellation on the first poll" { + testIoInit(); + defer testIoDeinit(); + + const dir = "/tmp/malt_supervisor_follow_cancel"; + std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + try std.Io.Dir.createDirAbsolute(test_io, dir, .default_dir); + defer std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + + const log_path = dir ++ "/sample.log"; + { + const f = try std.Io.Dir.createFileAbsolute(test_io, log_path, .{ .truncate = true }); + defer f.close(test_io); + try f.writeStreamingAll(test_io, "seed\n"); + } + + var aw: std.Io.Writer.Allocating = .init(testing.allocator); + defer aw.deinit(); + + const cancel_io = CancelSleepProbe.wrap(test_io, 1); + + FollowProbe.reset(); + // High stop_at: only sleep cancellation should end the loop. If the loop + // ever spins past Canceled, FollowProbe.calls will exceed 1. + FollowProbe.stop_at = 16; + + try followLog(cancel_io, testing.allocator, log_path, 0, &aw.writer, FollowProbe.cb); + + try testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls); + try testing.expectEqual(@as(usize, 1), FollowProbe.calls); +} + +test "followLog breaks when sleep reports cancellation after several idle polls" { + testIoInit(); + defer testIoDeinit(); + + const dir = "/tmp/malt_supervisor_follow_cancel_late"; + std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + try std.Io.Dir.createDirAbsolute(test_io, dir, .default_dir); + defer std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + + const log_path = dir ++ "/sample.log"; + { + const f = try std.Io.Dir.createFileAbsolute(test_io, log_path, .{ .truncate = true }); + defer f.close(test_io); + try f.writeStreamingAll(test_io, "seed\n"); + } + + var aw: std.Io.Writer.Allocating = .init(testing.allocator); + defer aw.deinit(); + + // Cancel the fourth sleep — three idle polls happen first, pinning that + // the catch arm fires regardless of how many normal sleeps preceded it + // and that the loop doesn't latch on the prior non-cancelled returns. + const cancel_io = CancelSleepProbe.wrap(test_io, 4); + + FollowProbe.reset(); + FollowProbe.stop_at = 16; + + try followLog(cancel_io, testing.allocator, log_path, 0, &aw.writer, FollowProbe.cb); + + try testing.expectEqual(@as(usize, 4), CancelSleepProbe.sleep_calls); + try testing.expectEqual(@as(usize, 4), FollowProbe.calls); +} + +test "followLog flushes data appended before sleep cancellation lands" { + testIoInit(); + defer testIoDeinit(); + + const dir = "/tmp/malt_supervisor_follow_cancel_flush"; + std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + try std.Io.Dir.createDirAbsolute(test_io, dir, .default_dir); + defer std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + + const log_path = dir ++ "/sample.log"; + { + const f = try std.Io.Dir.createFileAbsolute(test_io, log_path, .{ .truncate = true }); + defer f.close(test_io); + try f.writeStreamingAll(test_io, "seed\n"); + } + + var aw: std.Io.Writer.Allocating = .init(testing.allocator); + defer aw.deinit(); + + // Append data on the first interrupt-check, then cancel the second + // sleep so the flushed bytes must already be in the writer when the + // loop bails. + FollowProbe.reset(); + FollowProbe.append_path = log_path; + FollowProbe.append_bytes = "tail-bytes\n"; + FollowProbe.append_at = 1; + FollowProbe.stop_at = 16; + + const cancel_io = CancelSleepProbe.wrap(test_io, 2); + + try followLog(cancel_io, testing.allocator, log_path, 0, &aw.writer, FollowProbe.cb); + + try testing.expect(std.mem.indexOf(u8, aw.written(), "tail-bytes\n") != null); + try testing.expectEqual(@as(usize, 2), CancelSleepProbe.sleep_calls); +} diff --git a/src/net/client.zig b/src/net/client.zig index 8511580..bdf4447 100644 --- a/src/net/client.zig +++ b/src/net/client.zig @@ -377,8 +377,13 @@ pub const HttpClient = struct { if (classifyStatus(resp.status)) |dl_err| { if (isTransientError(dl_err) and attempt < max_retries) { resp.allocator.free(resp.body); - // Sleep is backoff jitter; interruption just retries sooner. - std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch {}; + // Cancellation is single-shot per task in std.Io — + // swallowing it here means the caller's stop signal + // is consumed by the backoff and never reaches the + // next request, so propagate it as the result. + std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch |e| switch (e) { + error.Canceled => return error.Canceled, + }; attempt += 1; continue; } @@ -386,8 +391,9 @@ pub const HttpClient = struct { return resp; } else |err| { if (attempt < max_retries) { - // Sleep is backoff jitter; interruption just retries sooner. - std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch {}; + std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch |e| switch (e) { + error.Canceled => return error.Canceled, + }; attempt += 1; continue; } @@ -712,3 +718,59 @@ test "shouldFireIdleWatchdog: cancellation fires regardless of elapsed time" { try std.testing.expect(shouldFireIdleWatchdog(0, 0, 30, 600, true)); try std.testing.expect(shouldFireIdleWatchdog(1, 1, 999_999, 999_999, true)); } + +// Patches an inner Io's vtable so `sleep` reports cancellation on the +// configured call index; non-canceled sleeps return immediately so the +// retry-table delays don't pad the test runtime. `cancel_at = 1` cancels +// the first sleep, `cancel_at = N` returns from the prior N-1 sleeps and +// trips the Nth. +const CancelSleepProbe = struct { + var vtable: std.Io.VTable = undefined; + var sleep_calls: usize = 0; + var cancel_at: usize = 1; + + fn wrap(inner: std.Io, cancel_at_call: usize) std.Io { + vtable = inner.vtable.*; + vtable.sleep = sleepMaybeCanceled; + sleep_calls = 0; + cancel_at = cancel_at_call; + return .{ .userdata = inner.userdata, .vtable = &vtable }; + } + + fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void { + sleep_calls += 1; + if (sleep_calls >= cancel_at) return error.Canceled; + } +}; + +test "doGetWithRetry surfaces sleep cancellation on the first backoff" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + const cancel_io = CancelSleepProbe.wrap(threaded.io(), 1); + + var http = HttpClient.init(cancel_io, std.process.Environ.empty, std.testing.allocator); + defer http.deinit(); + + // 127.0.0.1:1 fast-fails with ECONNREFUSED so the retry-sleep is reached + // on the first attempt; without the fix, every backoff swallows Canceled + // and the call resolves with the connect error after burning all retries. + const result = http.get("http://127.0.0.1:1/nothing-listens-here"); + try std.testing.expectError(error.Canceled, result); + try std.testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls); +} + +test "doGetWithRetry surfaces sleep cancellation on a later backoff" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + // Cancel the third sleep — the prior two return normally, exercising + // the retry counter alongside the catch arm so we know the propagation + // isn't tied to attempt 0 only. + const cancel_io = CancelSleepProbe.wrap(threaded.io(), 3); + + var http = HttpClient.init(cancel_io, std.process.Environ.empty, std.testing.allocator); + defer http.deinit(); + + const result = http.get("http://127.0.0.1:1/nothing-listens-here"); + try std.testing.expectError(error.Canceled, result); + try std.testing.expectEqual(@as(usize, 3), CancelSleepProbe.sleep_calls); +} diff --git a/src/ui/progress.zig b/src/ui/progress.zig index b323ba8..ce96bc6 100644 --- a/src/ui/progress.zig +++ b/src/ui/progress.zig @@ -39,8 +39,14 @@ fn nowNs() i128 { return std.Io.Clock.real.now(pkg_io).toNanoseconds(); } -fn sleepNs(ns: u64) void { - std.Io.sleep(pkg_io, std.Io.Duration.fromNanoseconds(@intCast(ns)), .awake) catch {}; +/// Sleeps for `ns` nanoseconds against the package io. Returns false when +/// a cancellation request reached the io subsystem before the sleep +/// finished, so spin/poll callers can bail rather than swallow the signal. +fn sleepNs(ns: u64) bool { + std.Io.sleep(pkg_io, std.Io.Duration.fromNanoseconds(@intCast(ns)), .awake) catch |e| switch (e) { + error.Canceled => return false, + }; + return true; } /// Braille-based spinner frames, shared by ProgressBar and Spinner. @@ -561,7 +567,7 @@ pub const Spinner = struct { while (!self.stop_flag.load(.acquire)) { self.drawFrame(frame); frame +%= 1; - sleepNs(100 * std.time.ns_per_ms); + if (!sleepNs(100 * std.time.ns_per_ms)) break; } } @@ -672,3 +678,62 @@ test "restoreTerminal is callable without an active MultiProgress" { restoreTerminal(); restoreTerminal(); } + +// Patches an inner Io's vtable so `sleep` reports cancellation on the +// configured call index; non-canceled sleeps return immediately so the +// 100 ms spinner cadence doesn't pad the test runtime. +const CancelSleepProbe = struct { + var vtable: std.Io.VTable = undefined; + var sleep_calls: usize = 0; + var cancel_at: usize = 1; + + fn wrap(inner: std.Io, cancel_at_call: usize) std.Io { + vtable = inner.vtable.*; + vtable.sleep = sleepMaybeCanceled; + sleep_calls = 0; + cancel_at = cancel_at_call; + return .{ .userdata = inner.userdata, .vtable = &vtable }; + } + + fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void { + sleep_calls += 1; + if (sleep_calls >= cancel_at) return error.Canceled; + } +}; + +test "sleepNs returns true when the sleep completes normally" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + const prev_io = pkg_io; + pkg_io = threaded.io(); + defer pkg_io = prev_io; + + try std.testing.expect(sleepNs(0)); +} + +test "sleepNs returns false when sleep is cancelled" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + const prev_io = pkg_io; + pkg_io = CancelSleepProbe.wrap(threaded.io(), 1); + defer pkg_io = prev_io; + + try std.testing.expect(!sleepNs(100)); + try std.testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls); +} + +test "sleepNs propagates cancellation when called repeatedly" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + const prev_io = pkg_io; + pkg_io = CancelSleepProbe.wrap(threaded.io(), 3); + defer pkg_io = prev_io; + + // Two normal sleeps complete and report true; the third trips Canceled, + // pinning that the helper isn't latched after the first non-cancelled + // return and matches the spin-loop's per-tick break contract. + try std.testing.expect(sleepNs(0)); + try std.testing.expect(sleepNs(0)); + try std.testing.expect(!sleepNs(0)); + try std.testing.expectEqual(@as(usize, 3), CancelSleepProbe.sleep_calls); +}