From a89dacb1d077b7dae77d678ee516704e52db9c79 Mon Sep 17 00:00:00 2001 From: indaco Date: Thu, 7 May 2026 20:53:55 +0200 Subject: [PATCH 1/5] fix(services/supervisor): break the poll loop on sleep cancellation The follow-log loop swallowed a cancelled sleep, so a stop signal that reached the io subsystem before the interrupt flag flipped left the loop busy-spinning. Propagate the cancellation as a clean exit. --- src/core/services/supervisor.zig | 58 +++++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/src/core/services/supervisor.zig b/src/core/services/supervisor.zig index d536963..efabbfc 100644 --- a/src/core/services/supervisor.zig +++ b/src/core/services/supervisor.zig @@ -407,7 +407,12 @@ pub fn followLog( while (!interrupted()) { const n = f.readPositionalAll(io, &buf, offset) catch return SupervisorError.IoFailed; if (n == 0) { - std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(poll_ns)), .awake) catch {}; + // Cancelled sleep means SIGINT (or another stop signal) reached + // the io subsystem before `interrupted()` flipped; bail rather + // than busy-spin until the flag catches up. + std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(poll_ns)), .awake) catch |e| switch (e) { + error.Canceled => break, + }; continue; } writer.writeAll(buf[0..n]) catch return SupervisorError.IoFailed; @@ -518,3 +523,54 @@ test "followLog flushes appended bytes between polls" { try testing.expect(std.mem.indexOf(u8, aw.written(), "appended\n") != null); try testing.expectEqual(@as(usize, 3), FollowProbe.calls); } + +// Patches an inner Io's vtable so `sleep` always reports cancellation; +// every other operation still flows to the real backend. +const CancelSleepProbe = struct { + var vtable: std.Io.VTable = undefined; + var sleep_calls: usize = 0; + + fn wrap(inner: std.Io) std.Io { + vtable = inner.vtable.*; + vtable.sleep = sleepCanceled; + sleep_calls = 0; + return .{ .userdata = inner.userdata, .vtable = &vtable }; + } + + fn sleepCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void { + sleep_calls += 1; + return error.Canceled; + } +}; + +test "followLog breaks when sleep reports cancellation" { + testIoInit(); + defer testIoDeinit(); + + const dir = "/tmp/malt_supervisor_follow_cancel"; + std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + try std.Io.Dir.createDirAbsolute(test_io, dir, .default_dir); + defer std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + + const log_path = dir ++ "/sample.log"; + { + const f = try std.Io.Dir.createFileAbsolute(test_io, log_path, .{ .truncate = true }); + defer f.close(test_io); + try f.writeStreamingAll(test_io, "seed\n"); + } + + var aw: std.Io.Writer.Allocating = .init(testing.allocator); + defer aw.deinit(); + + const cancel_io = CancelSleepProbe.wrap(test_io); + + FollowProbe.reset(); + // High stop_at: only sleep cancellation should end the loop. If the loop + // ever spins past Canceled, FollowProbe.calls will exceed 1. + FollowProbe.stop_at = 16; + + try followLog(cancel_io, testing.allocator, log_path, 0, &aw.writer, FollowProbe.cb); + + try testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls); + try testing.expectEqual(@as(usize, 1), FollowProbe.calls); +} From 1b3f5e79320d15e6989f5ac9a3b681a85b6e6476 Mon Sep 17 00:00:00 2001 From: indaco Date: Thu, 7 May 2026 21:01:44 +0200 Subject: [PATCH 2/5] fix(net/client): propagate sleep cancellation through retry backoff std.Io cancellation is single-shot per task: a backoff-sleep that catches and discards Canceled consumes the caller's stop signal, so the next request never sees it and the retry loop runs to completion. Surface Canceled as the result instead. --- src/net/client.zig | 70 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 4 deletions(-) diff --git a/src/net/client.zig b/src/net/client.zig index 8511580..bdf4447 100644 --- a/src/net/client.zig +++ b/src/net/client.zig @@ -377,8 +377,13 @@ pub const HttpClient = struct { if (classifyStatus(resp.status)) |dl_err| { if (isTransientError(dl_err) and attempt < max_retries) { resp.allocator.free(resp.body); - // Sleep is backoff jitter; interruption just retries sooner. - std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch {}; + // Cancellation is single-shot per task in std.Io — + // swallowing it here means the caller's stop signal + // is consumed by the backoff and never reaches the + // next request, so propagate it as the result. + std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch |e| switch (e) { + error.Canceled => return error.Canceled, + }; attempt += 1; continue; } @@ -386,8 +391,9 @@ pub const HttpClient = struct { return resp; } else |err| { if (attempt < max_retries) { - // Sleep is backoff jitter; interruption just retries sooner. - std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch {}; + std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch |e| switch (e) { + error.Canceled => return error.Canceled, + }; attempt += 1; continue; } @@ -712,3 +718,59 @@ test "shouldFireIdleWatchdog: cancellation fires regardless of elapsed time" { try std.testing.expect(shouldFireIdleWatchdog(0, 0, 30, 600, true)); try std.testing.expect(shouldFireIdleWatchdog(1, 1, 999_999, 999_999, true)); } + +// Patches an inner Io's vtable so `sleep` reports cancellation on the +// configured call index; non-canceled sleeps return immediately so the +// retry-table delays don't pad the test runtime. `cancel_at = 1` cancels +// the first sleep, `cancel_at = N` returns from the prior N-1 sleeps and +// trips the Nth. +const CancelSleepProbe = struct { + var vtable: std.Io.VTable = undefined; + var sleep_calls: usize = 0; + var cancel_at: usize = 1; + + fn wrap(inner: std.Io, cancel_at_call: usize) std.Io { + vtable = inner.vtable.*; + vtable.sleep = sleepMaybeCanceled; + sleep_calls = 0; + cancel_at = cancel_at_call; + return .{ .userdata = inner.userdata, .vtable = &vtable }; + } + + fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void { + sleep_calls += 1; + if (sleep_calls >= cancel_at) return error.Canceled; + } +}; + +test "doGetWithRetry surfaces sleep cancellation on the first backoff" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + const cancel_io = CancelSleepProbe.wrap(threaded.io(), 1); + + var http = HttpClient.init(cancel_io, std.process.Environ.empty, std.testing.allocator); + defer http.deinit(); + + // 127.0.0.1:1 fast-fails with ECONNREFUSED so the retry-sleep is reached + // on the first attempt; without the fix, every backoff swallows Canceled + // and the call resolves with the connect error after burning all retries. + const result = http.get("http://127.0.0.1:1/nothing-listens-here"); + try std.testing.expectError(error.Canceled, result); + try std.testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls); +} + +test "doGetWithRetry surfaces sleep cancellation on a later backoff" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + // Cancel the third sleep — the prior two return normally, exercising + // the retry counter alongside the catch arm so we know the propagation + // isn't tied to attempt 0 only. + const cancel_io = CancelSleepProbe.wrap(threaded.io(), 3); + + var http = HttpClient.init(cancel_io, std.process.Environ.empty, std.testing.allocator); + defer http.deinit(); + + const result = http.get("http://127.0.0.1:1/nothing-listens-here"); + try std.testing.expectError(error.Canceled, result); + try std.testing.expectEqual(@as(usize, 3), CancelSleepProbe.sleep_calls); +} From 6329222280cc3ce9f1cb28ec811b4a0f552a7e91 Mon Sep 17 00:00:00 2001 From: indaco Date: Thu, 7 May 2026 21:09:03 +0200 Subject: [PATCH 3/5] fix(install/download): break the bottle retry loop on sleep cancellation Symmetric with the net retry path: a swallowed Canceled would consume the caller's stop signal so the next attempt's blocking call never sees it. Surface the cancellation by exiting the retry loop instead. --- src/cli/install/download.zig | 65 +++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/src/cli/install/download.zig b/src/cli/install/download.zig index 66ecd03..131ffd7 100644 --- a/src/cli/install/download.zig +++ b/src/cli/install/download.zig @@ -78,6 +78,17 @@ pub fn progressBridge(ctx: *anyopaque, bytes_so_far: u64, content_length: ?u64) bar.update(clamped); } +/// Sleeps for `ms` between retries. Returns false when the caller's +/// stop signal cancels the sleep, true otherwise. std.Io cancellation +/// is single-shot per task, so a swallowed Canceled would silently +/// consume the request and the loop would keep retrying. +fn cancellableBackoff(io: std.Io, ms: u64) bool { + std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(ms * std.time.ns_per_ms)), .awake) catch |e| switch (e) { + error.Canceled => return false, + }; + return true; +} + /// Download a bottle and commit to store. Runs in a worker thread. /// `http_pool` is shared across all download workers — each worker /// borrows a client for the duration of a single blob download and @@ -152,7 +163,7 @@ pub fn downloadWorker( break; } if (dl_attempt + 1 < max_attempts) { - std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[dl_attempt] * std.time.ns_per_ms)), .awake) catch {}; + if (!cancellableBackoff(io, retry_delays_ms[dl_attempt])) break; } } } @@ -692,3 +703,55 @@ test "MaterializeResult.kegPath reflects keg_path_len after write" { r.keg_path_len = path.len; try std.testing.expectEqualStrings(path, r.kegPath()); } + +// Patches an inner Io's vtable so `sleep` reports cancellation on the +// configured call index; non-canceled sleeps return immediately so the +// retry-table delays don't pad the test runtime. +const CancelSleepProbe = struct { + var vtable: std.Io.VTable = undefined; + var sleep_calls: usize = 0; + var cancel_at: usize = 1; + + fn wrap(inner: std.Io, cancel_at_call: usize) std.Io { + vtable = inner.vtable.*; + vtable.sleep = sleepMaybeCanceled; + sleep_calls = 0; + cancel_at = cancel_at_call; + return .{ .userdata = inner.userdata, .vtable = &vtable }; + } + + fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void { + sleep_calls += 1; + if (sleep_calls >= cancel_at) return error.Canceled; + } +}; + +test "cancellableBackoff returns true when sleep completes normally" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + // Zero-ms request keeps the test deterministic without exercising the + // real clock; a successful sleep must report true so the retry loop + // moves on to the next attempt. + try std.testing.expect(cancellableBackoff(threaded.io(), 0)); +} + +test "cancellableBackoff returns false when sleep is cancelled" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + const cancel_io = CancelSleepProbe.wrap(threaded.io(), 1); + try std.testing.expect(!cancellableBackoff(cancel_io, 100)); + try std.testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls); +} + +test "cancellableBackoff propagates cancellation when called repeatedly" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + // Cancel the third call: the prior two complete and report true, + // pinning that the helper isn't inadvertently latched after the + // first non-cancelled sleep. + const cancel_io = CancelSleepProbe.wrap(threaded.io(), 3); + try std.testing.expect(cancellableBackoff(cancel_io, 0)); + try std.testing.expect(cancellableBackoff(cancel_io, 0)); + try std.testing.expect(!cancellableBackoff(cancel_io, 0)); + try std.testing.expectEqual(@as(usize, 3), CancelSleepProbe.sleep_calls); +} From 76aaebe2dc244682b451769875e1295e0876aed7 Mon Sep 17 00:00:00 2001 From: indaco Date: Thu, 7 May 2026 21:09:07 +0200 Subject: [PATCH 4/5] test(services/supervisor): expand follow-log cancellation coverage Pin three additional behaviours of the cancel-on-sleep path: late cancellation after several idle polls, that the loop doesn't latch on the prior non-cancelled returns, and that data appended just before the cancel is still flushed to the writer. --- src/core/services/supervisor.zig | 88 +++++++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 8 deletions(-) diff --git a/src/core/services/supervisor.zig b/src/core/services/supervisor.zig index efabbfc..98a13f2 100644 --- a/src/core/services/supervisor.zig +++ b/src/core/services/supervisor.zig @@ -524,26 +524,29 @@ test "followLog flushes appended bytes between polls" { try testing.expectEqual(@as(usize, 3), FollowProbe.calls); } -// Patches an inner Io's vtable so `sleep` always reports cancellation; -// every other operation still flows to the real backend. +// Patches an inner Io's vtable so `sleep` reports cancellation on the +// configured call index; non-canceled sleeps return immediately so the +// 200 ms poll cadence doesn't pad the test runtime. const CancelSleepProbe = struct { var vtable: std.Io.VTable = undefined; var sleep_calls: usize = 0; + var cancel_at: usize = 1; - fn wrap(inner: std.Io) std.Io { + fn wrap(inner: std.Io, cancel_at_call: usize) std.Io { vtable = inner.vtable.*; - vtable.sleep = sleepCanceled; + vtable.sleep = sleepMaybeCanceled; sleep_calls = 0; + cancel_at = cancel_at_call; return .{ .userdata = inner.userdata, .vtable = &vtable }; } - fn sleepCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void { + fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void { sleep_calls += 1; - return error.Canceled; + if (sleep_calls >= cancel_at) return error.Canceled; } }; -test "followLog breaks when sleep reports cancellation" { +test "followLog breaks when sleep reports cancellation on the first poll" { testIoInit(); defer testIoDeinit(); @@ -562,7 +565,7 @@ test "followLog breaks when sleep reports cancellation" { var aw: std.Io.Writer.Allocating = .init(testing.allocator); defer aw.deinit(); - const cancel_io = CancelSleepProbe.wrap(test_io); + const cancel_io = CancelSleepProbe.wrap(test_io, 1); FollowProbe.reset(); // High stop_at: only sleep cancellation should end the loop. If the loop @@ -574,3 +577,72 @@ test "followLog breaks when sleep reports cancellation" { try testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls); try testing.expectEqual(@as(usize, 1), FollowProbe.calls); } + +test "followLog breaks when sleep reports cancellation after several idle polls" { + testIoInit(); + defer testIoDeinit(); + + const dir = "/tmp/malt_supervisor_follow_cancel_late"; + std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + try std.Io.Dir.createDirAbsolute(test_io, dir, .default_dir); + defer std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + + const log_path = dir ++ "/sample.log"; + { + const f = try std.Io.Dir.createFileAbsolute(test_io, log_path, .{ .truncate = true }); + defer f.close(test_io); + try f.writeStreamingAll(test_io, "seed\n"); + } + + var aw: std.Io.Writer.Allocating = .init(testing.allocator); + defer aw.deinit(); + + // Cancel the fourth sleep — three idle polls happen first, pinning that + // the catch arm fires regardless of how many normal sleeps preceded it + // and that the loop doesn't latch on the prior non-cancelled returns. + const cancel_io = CancelSleepProbe.wrap(test_io, 4); + + FollowProbe.reset(); + FollowProbe.stop_at = 16; + + try followLog(cancel_io, testing.allocator, log_path, 0, &aw.writer, FollowProbe.cb); + + try testing.expectEqual(@as(usize, 4), CancelSleepProbe.sleep_calls); + try testing.expectEqual(@as(usize, 4), FollowProbe.calls); +} + +test "followLog flushes data appended before sleep cancellation lands" { + testIoInit(); + defer testIoDeinit(); + + const dir = "/tmp/malt_supervisor_follow_cancel_flush"; + std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + try std.Io.Dir.createDirAbsolute(test_io, dir, .default_dir); + defer std.Io.Dir.cwd().deleteTree(test_io, dir) catch {}; + + const log_path = dir ++ "/sample.log"; + { + const f = try std.Io.Dir.createFileAbsolute(test_io, log_path, .{ .truncate = true }); + defer f.close(test_io); + try f.writeStreamingAll(test_io, "seed\n"); + } + + var aw: std.Io.Writer.Allocating = .init(testing.allocator); + defer aw.deinit(); + + // Append data on the first interrupt-check, then cancel the second + // sleep so the flushed bytes must already be in the writer when the + // loop bails. + FollowProbe.reset(); + FollowProbe.append_path = log_path; + FollowProbe.append_bytes = "tail-bytes\n"; + FollowProbe.append_at = 1; + FollowProbe.stop_at = 16; + + const cancel_io = CancelSleepProbe.wrap(test_io, 2); + + try followLog(cancel_io, testing.allocator, log_path, 0, &aw.writer, FollowProbe.cb); + + try testing.expect(std.mem.indexOf(u8, aw.written(), "tail-bytes\n") != null); + try testing.expectEqual(@as(usize, 2), CancelSleepProbe.sleep_calls); +} From a54bb5176bff81a0adf9583d4ee92e0716ec347f Mon Sep 17 00:00:00 2001 From: indaco Date: Thu, 7 May 2026 21:19:11 +0200 Subject: [PATCH 5/5] fix(ui/progress): break the spinner loop on sleep cancellation The spinner's stop_flag is the primary cancellation seam, but the loop should also bail if the io subsystem reports cancellation mid-tick rather than waiting another iteration for the flag to be set. --- src/ui/progress.zig | 71 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 3 deletions(-) diff --git a/src/ui/progress.zig b/src/ui/progress.zig index b323ba8..ce96bc6 100644 --- a/src/ui/progress.zig +++ b/src/ui/progress.zig @@ -39,8 +39,14 @@ fn nowNs() i128 { return std.Io.Clock.real.now(pkg_io).toNanoseconds(); } -fn sleepNs(ns: u64) void { - std.Io.sleep(pkg_io, std.Io.Duration.fromNanoseconds(@intCast(ns)), .awake) catch {}; +/// Sleeps for `ns` nanoseconds against the package io. Returns false when +/// a cancellation request reached the io subsystem before the sleep +/// finished, so spin/poll callers can bail rather than swallow the signal. +fn sleepNs(ns: u64) bool { + std.Io.sleep(pkg_io, std.Io.Duration.fromNanoseconds(@intCast(ns)), .awake) catch |e| switch (e) { + error.Canceled => return false, + }; + return true; } /// Braille-based spinner frames, shared by ProgressBar and Spinner. @@ -561,7 +567,7 @@ pub const Spinner = struct { while (!self.stop_flag.load(.acquire)) { self.drawFrame(frame); frame +%= 1; - sleepNs(100 * std.time.ns_per_ms); + if (!sleepNs(100 * std.time.ns_per_ms)) break; } } @@ -672,3 +678,62 @@ test "restoreTerminal is callable without an active MultiProgress" { restoreTerminal(); restoreTerminal(); } + +// Patches an inner Io's vtable so `sleep` reports cancellation on the +// configured call index; non-canceled sleeps return immediately so the +// 100 ms spinner cadence doesn't pad the test runtime. +const CancelSleepProbe = struct { + var vtable: std.Io.VTable = undefined; + var sleep_calls: usize = 0; + var cancel_at: usize = 1; + + fn wrap(inner: std.Io, cancel_at_call: usize) std.Io { + vtable = inner.vtable.*; + vtable.sleep = sleepMaybeCanceled; + sleep_calls = 0; + cancel_at = cancel_at_call; + return .{ .userdata = inner.userdata, .vtable = &vtable }; + } + + fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void { + sleep_calls += 1; + if (sleep_calls >= cancel_at) return error.Canceled; + } +}; + +test "sleepNs returns true when the sleep completes normally" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + const prev_io = pkg_io; + pkg_io = threaded.io(); + defer pkg_io = prev_io; + + try std.testing.expect(sleepNs(0)); +} + +test "sleepNs returns false when sleep is cancelled" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + const prev_io = pkg_io; + pkg_io = CancelSleepProbe.wrap(threaded.io(), 1); + defer pkg_io = prev_io; + + try std.testing.expect(!sleepNs(100)); + try std.testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls); +} + +test "sleepNs propagates cancellation when called repeatedly" { + var threaded: std.Io.Threaded = .init(std.testing.allocator, .{}); + defer threaded.deinit(); + const prev_io = pkg_io; + pkg_io = CancelSleepProbe.wrap(threaded.io(), 3); + defer pkg_io = prev_io; + + // Two normal sleeps complete and report true; the third trips Canceled, + // pinning that the helper isn't latched after the first non-cancelled + // return and matches the spin-loop's per-tick break contract. + try std.testing.expect(sleepNs(0)); + try std.testing.expect(sleepNs(0)); + try std.testing.expect(!sleepNs(0)); + try std.testing.expectEqual(@as(usize, 3), CancelSleepProbe.sleep_calls); +}