Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion src/cli/install/download.zig
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ pub fn progressBridge(ctx: *anyopaque, bytes_so_far: u64, content_length: ?u64)
bar.update(clamped);
}

/// Sleeps for `ms` between retries. Returns false when the caller's
/// stop signal cancels the sleep, true otherwise. std.Io cancellation
/// is single-shot per task, so a swallowed Canceled would silently
/// consume the request and the loop would keep retrying.
fn cancellableBackoff(io: std.Io, ms: u64) bool {
std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(ms * std.time.ns_per_ms)), .awake) catch |e| switch (e) {
error.Canceled => return false,
};
return true;
}

/// Download a bottle and commit to store. Runs in a worker thread.
/// `http_pool` is shared across all download workers — each worker
/// borrows a client for the duration of a single blob download and
Expand Down Expand Up @@ -152,7 +163,7 @@ pub fn downloadWorker(
break;
}
if (dl_attempt + 1 < max_attempts) {
std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[dl_attempt] * std.time.ns_per_ms)), .awake) catch {};
if (!cancellableBackoff(io, retry_delays_ms[dl_attempt])) break;
}
}
}
Expand Down Expand Up @@ -692,3 +703,55 @@ test "MaterializeResult.kegPath reflects keg_path_len after write" {
r.keg_path_len = path.len;
try std.testing.expectEqualStrings(path, r.kegPath());
}

// Patches an inner Io's vtable so `sleep` reports cancellation on the
// configured call index; non-canceled sleeps return immediately so the
// retry-table delays don't pad the test runtime.
const CancelSleepProbe = struct {
var vtable: std.Io.VTable = undefined;
var sleep_calls: usize = 0;
var cancel_at: usize = 1;

fn wrap(inner: std.Io, cancel_at_call: usize) std.Io {
vtable = inner.vtable.*;
vtable.sleep = sleepMaybeCanceled;
sleep_calls = 0;
cancel_at = cancel_at_call;
return .{ .userdata = inner.userdata, .vtable = &vtable };
}

fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void {
sleep_calls += 1;
if (sleep_calls >= cancel_at) return error.Canceled;
}
};

test "cancellableBackoff returns true when sleep completes normally" {
var threaded: std.Io.Threaded = .init(std.testing.allocator, .{});
defer threaded.deinit();
// Zero-ms request keeps the test deterministic without exercising the
// real clock; a successful sleep must report true so the retry loop
// moves on to the next attempt.
try std.testing.expect(cancellableBackoff(threaded.io(), 0));
}

test "cancellableBackoff returns false when sleep is cancelled" {
var threaded: std.Io.Threaded = .init(std.testing.allocator, .{});
defer threaded.deinit();
const cancel_io = CancelSleepProbe.wrap(threaded.io(), 1);
try std.testing.expect(!cancellableBackoff(cancel_io, 100));
try std.testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls);
}

test "cancellableBackoff propagates cancellation when called repeatedly" {
var threaded: std.Io.Threaded = .init(std.testing.allocator, .{});
defer threaded.deinit();
// Cancel the third call: the prior two complete and report true,
// pinning that the helper isn't inadvertently latched after the
// first non-cancelled sleep.
const cancel_io = CancelSleepProbe.wrap(threaded.io(), 3);
try std.testing.expect(cancellableBackoff(cancel_io, 0));
try std.testing.expect(cancellableBackoff(cancel_io, 0));
try std.testing.expect(!cancellableBackoff(cancel_io, 0));
try std.testing.expectEqual(@as(usize, 3), CancelSleepProbe.sleep_calls);
}
130 changes: 129 additions & 1 deletion src/core/services/supervisor.zig
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,12 @@ pub fn followLog(
while (!interrupted()) {
const n = f.readPositionalAll(io, &buf, offset) catch return SupervisorError.IoFailed;
if (n == 0) {
std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(poll_ns)), .awake) catch {};
// Cancelled sleep means SIGINT (or another stop signal) reached
// the io subsystem before `interrupted()` flipped; bail rather
// than busy-spin until the flag catches up.
std.Io.sleep(io, std.Io.Duration.fromNanoseconds(@intCast(poll_ns)), .awake) catch |e| switch (e) {
error.Canceled => break,
};
continue;
}
writer.writeAll(buf[0..n]) catch return SupervisorError.IoFailed;
Expand Down Expand Up @@ -518,3 +523,126 @@ test "followLog flushes appended bytes between polls" {
try testing.expect(std.mem.indexOf(u8, aw.written(), "appended\n") != null);
try testing.expectEqual(@as(usize, 3), FollowProbe.calls);
}

// Patches an inner Io's vtable so `sleep` reports cancellation on the
// configured call index; non-canceled sleeps return immediately so the
// 200 ms poll cadence doesn't pad the test runtime.
const CancelSleepProbe = struct {
var vtable: std.Io.VTable = undefined;
var sleep_calls: usize = 0;
var cancel_at: usize = 1;

fn wrap(inner: std.Io, cancel_at_call: usize) std.Io {
vtable = inner.vtable.*;
vtable.sleep = sleepMaybeCanceled;
sleep_calls = 0;
cancel_at = cancel_at_call;
return .{ .userdata = inner.userdata, .vtable = &vtable };
}

fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void {
sleep_calls += 1;
if (sleep_calls >= cancel_at) return error.Canceled;
}
};

test "followLog breaks when sleep reports cancellation on the first poll" {
testIoInit();
defer testIoDeinit();

const dir = "/tmp/malt_supervisor_follow_cancel";
std.Io.Dir.cwd().deleteTree(test_io, dir) catch {};
try std.Io.Dir.createDirAbsolute(test_io, dir, .default_dir);
defer std.Io.Dir.cwd().deleteTree(test_io, dir) catch {};

const log_path = dir ++ "/sample.log";
{
const f = try std.Io.Dir.createFileAbsolute(test_io, log_path, .{ .truncate = true });
defer f.close(test_io);
try f.writeStreamingAll(test_io, "seed\n");
}

var aw: std.Io.Writer.Allocating = .init(testing.allocator);
defer aw.deinit();

const cancel_io = CancelSleepProbe.wrap(test_io, 1);

FollowProbe.reset();
// High stop_at: only sleep cancellation should end the loop. If the loop
// ever spins past Canceled, FollowProbe.calls will exceed 1.
FollowProbe.stop_at = 16;

try followLog(cancel_io, testing.allocator, log_path, 0, &aw.writer, FollowProbe.cb);

try testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls);
try testing.expectEqual(@as(usize, 1), FollowProbe.calls);
}

test "followLog breaks when sleep reports cancellation after several idle polls" {
testIoInit();
defer testIoDeinit();

const dir = "/tmp/malt_supervisor_follow_cancel_late";
std.Io.Dir.cwd().deleteTree(test_io, dir) catch {};
try std.Io.Dir.createDirAbsolute(test_io, dir, .default_dir);
defer std.Io.Dir.cwd().deleteTree(test_io, dir) catch {};

const log_path = dir ++ "/sample.log";
{
const f = try std.Io.Dir.createFileAbsolute(test_io, log_path, .{ .truncate = true });
defer f.close(test_io);
try f.writeStreamingAll(test_io, "seed\n");
}

var aw: std.Io.Writer.Allocating = .init(testing.allocator);
defer aw.deinit();

// Cancel the fourth sleep — three idle polls happen first, pinning that
// the catch arm fires regardless of how many normal sleeps preceded it
// and that the loop doesn't latch on the prior non-cancelled returns.
const cancel_io = CancelSleepProbe.wrap(test_io, 4);

FollowProbe.reset();
FollowProbe.stop_at = 16;

try followLog(cancel_io, testing.allocator, log_path, 0, &aw.writer, FollowProbe.cb);

try testing.expectEqual(@as(usize, 4), CancelSleepProbe.sleep_calls);
try testing.expectEqual(@as(usize, 4), FollowProbe.calls);
}

test "followLog flushes data appended before sleep cancellation lands" {
testIoInit();
defer testIoDeinit();

const dir = "/tmp/malt_supervisor_follow_cancel_flush";
std.Io.Dir.cwd().deleteTree(test_io, dir) catch {};
try std.Io.Dir.createDirAbsolute(test_io, dir, .default_dir);
defer std.Io.Dir.cwd().deleteTree(test_io, dir) catch {};

const log_path = dir ++ "/sample.log";
{
const f = try std.Io.Dir.createFileAbsolute(test_io, log_path, .{ .truncate = true });
defer f.close(test_io);
try f.writeStreamingAll(test_io, "seed\n");
}

var aw: std.Io.Writer.Allocating = .init(testing.allocator);
defer aw.deinit();

// Append data on the first interrupt-check, then cancel the second
// sleep so the flushed bytes must already be in the writer when the
// loop bails.
FollowProbe.reset();
FollowProbe.append_path = log_path;
FollowProbe.append_bytes = "tail-bytes\n";
FollowProbe.append_at = 1;
FollowProbe.stop_at = 16;

const cancel_io = CancelSleepProbe.wrap(test_io, 2);

try followLog(cancel_io, testing.allocator, log_path, 0, &aw.writer, FollowProbe.cb);

try testing.expect(std.mem.indexOf(u8, aw.written(), "tail-bytes\n") != null);
try testing.expectEqual(@as(usize, 2), CancelSleepProbe.sleep_calls);
}
70 changes: 66 additions & 4 deletions src/net/client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -377,17 +377,23 @@ pub const HttpClient = struct {
if (classifyStatus(resp.status)) |dl_err| {
if (isTransientError(dl_err) and attempt < max_retries) {
resp.allocator.free(resp.body);
// Sleep is backoff jitter; interruption just retries sooner.
std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch {};
// Cancellation is single-shot per task in std.Io —
// swallowing it here means the caller's stop signal
// is consumed by the backoff and never reaches the
// next request, so propagate it as the result.
std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch |e| switch (e) {
error.Canceled => return error.Canceled,
};
attempt += 1;
continue;
}
}
return resp;
} else |err| {
if (attempt < max_retries) {
// Sleep is backoff jitter; interruption just retries sooner.
std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch {};
std.Io.sleep(self.io, std.Io.Duration.fromNanoseconds(@intCast(retry_delays_ms[attempt] * std.time.ns_per_ms)), .awake) catch |e| switch (e) {
error.Canceled => return error.Canceled,
};
attempt += 1;
continue;
}
Expand Down Expand Up @@ -712,3 +718,59 @@ test "shouldFireIdleWatchdog: cancellation fires regardless of elapsed time" {
try std.testing.expect(shouldFireIdleWatchdog(0, 0, 30, 600, true));
try std.testing.expect(shouldFireIdleWatchdog(1, 1, 999_999, 999_999, true));
}

// Patches an inner Io's vtable so `sleep` reports cancellation on the
// configured call index; non-canceled sleeps return immediately so the
// retry-table delays don't pad the test runtime. `cancel_at = 1` cancels
// the first sleep, `cancel_at = N` returns from the prior N-1 sleeps and
// trips the Nth.
const CancelSleepProbe = struct {
var vtable: std.Io.VTable = undefined;
var sleep_calls: usize = 0;
var cancel_at: usize = 1;

fn wrap(inner: std.Io, cancel_at_call: usize) std.Io {
vtable = inner.vtable.*;
vtable.sleep = sleepMaybeCanceled;
sleep_calls = 0;
cancel_at = cancel_at_call;
return .{ .userdata = inner.userdata, .vtable = &vtable };
}

fn sleepMaybeCanceled(_: ?*anyopaque, _: std.Io.Timeout) std.Io.Cancelable!void {
sleep_calls += 1;
if (sleep_calls >= cancel_at) return error.Canceled;
}
};

test "doGetWithRetry surfaces sleep cancellation on the first backoff" {
var threaded: std.Io.Threaded = .init(std.testing.allocator, .{});
defer threaded.deinit();
const cancel_io = CancelSleepProbe.wrap(threaded.io(), 1);

var http = HttpClient.init(cancel_io, std.process.Environ.empty, std.testing.allocator);
defer http.deinit();

// 127.0.0.1:1 fast-fails with ECONNREFUSED so the retry-sleep is reached
// on the first attempt; without the fix, every backoff swallows Canceled
// and the call resolves with the connect error after burning all retries.
const result = http.get("http://127.0.0.1:1/nothing-listens-here");
try std.testing.expectError(error.Canceled, result);
try std.testing.expectEqual(@as(usize, 1), CancelSleepProbe.sleep_calls);
}

test "doGetWithRetry surfaces sleep cancellation on a later backoff" {
var threaded: std.Io.Threaded = .init(std.testing.allocator, .{});
defer threaded.deinit();
// Cancel the third sleep — the prior two return normally, exercising
// the retry counter alongside the catch arm so we know the propagation
// isn't tied to attempt 0 only.
const cancel_io = CancelSleepProbe.wrap(threaded.io(), 3);

var http = HttpClient.init(cancel_io, std.process.Environ.empty, std.testing.allocator);
defer http.deinit();

const result = http.get("http://127.0.0.1:1/nothing-listens-here");
try std.testing.expectError(error.Canceled, result);
try std.testing.expectEqual(@as(usize, 3), CancelSleepProbe.sleep_calls);
}
Loading
Loading