Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 71 additions & 19 deletions Examples/Files/StaticExample.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
using ioxide;
using ioxide.file;

using Microsoft.Win32.SafeHandles;

namespace Examples.Files;

/// <summary>
/// Static files over the per-reactor asset cache. A <see cref="StaticAssets.Lease"/> keeps the
/// snapshot alive for the whole request (so a concurrent reload can't free the fd mid-send). Small
/// files are served from a baked HTTP response with no I/O; larger files are read off the ring in
/// chunks. Misses are 404.
/// Static files over the shared asset cache. A <see cref="StaticAssets.Lease"/> keeps the snapshot
/// alive for the whole request (so a concurrent reload can't free the fd mid-send). Every lookup is
/// revalidated against disk (size + mtime + inode): an unchanged small file is served from its baked
/// HTTP response with no I/O; a file that was edited or atomically replaced is opened fresh and
/// streamed live, so the cache never serves stale bytes (it re-bakes on the next reload). Larger
/// files are read off the ring in chunks. Misses are 404.
/// </summary>
public static class StaticExample
{
Expand All @@ -29,16 +33,30 @@ public static async Task Handle(Reactor r, Connection conn)
{
if (!lease.TryGet(path, out AssetCache.Asset asset))
{
Http.WriteText(conn, 404, "Not Found", $"no asset {path}");
await conn.FlushAsync();
}
else if (asset.Response != 0)
{
await SendNative(conn, asset.Response, asset.ResponseLength);
await NotFound(conn, path);
}
else
{
await SendFromDisk(conn, readers, asset);
// Revalidate the cached asset against disk. The baked response is the hot path
// only while the file is unchanged; an edit or atomic rename is served live, so
// RAM never goes stale.
bool fresh = AssetCache.IsFresh(asset, out bool exists, out long size);
if (!exists)
{
await NotFound(conn, path); // vanished
}
else if (fresh && asset.Response != 0)
{
await SendNative(conn, asset.Response, asset.ResponseLength); // baked hot path
}
else if (fresh)
{
await SendFromDisk(conn, readers, asset, asset.Fd, asset.Length); // large, unchanged
}
else
{
await SendChangedFromDisk(conn, readers, asset, size); // changed -> live
}
}
}

Expand All @@ -56,28 +74,36 @@ public static async Task Handle(Reactor r, Connection conn)
}
}

// Read a large (non-baked) asset off the ring, in successive chunks for files bigger than the buffer.
private static async Task SendFromDisk(Connection conn, RingPool<AssetReader> readers, AssetCache.Asset asset)
private static async Task NotFound(Connection conn, string path)
{
Http.WriteText(conn, 404, "Not Found", $"no asset {path}");
await conn.FlushAsync();
}

// Stream an asset off the ring from <paramref name="fd"/>, framing Content-Length from
// <paramref name="totalLength"/>. Files bigger than the reader's buffer are read in successive
// chunks at advancing offsets, so they're served whole instead of truncated.
private static async Task SendFromDisk(Connection conn, RingPool<AssetReader> readers, AssetCache.Asset asset, int fd, long totalLength)
{
AssetReader reader = await readers.RentAsync();

try
{
int first = await reader.ReadAsync(asset.Fd, offset: 0);
int first = await reader.ReadAsync(fd, offset: 0);
if (first < 0)
{
Http.WriteText(conn, 500, "Internal Server Error", "read failed");
await conn.FlushAsync();
return;
}

WriteHeader(conn, asset);
WriteHeader(conn, asset, totalLength);
await SendNative(conn, reader.Buffer, first);

long offset = first;
while (offset < asset.Length)
while (offset < totalLength)
{
int read = await reader.ReadAsync(asset.Fd, offset);
int read = await reader.ReadAsync(fd, offset);
if (read <= 0)
{
break;
Expand All @@ -93,11 +119,37 @@ private static async Task SendFromDisk(Connection conn, RingPool<AssetReader> re
}
}

// Serve a file whose on-disk version no longer matches the baked snapshot: open the current path
// fresh (so an atomic rename resolves to the new inode, not the cached fd) and stream it live.
private static async Task SendChangedFromDisk(Connection conn, RingPool<AssetReader> readers, AssetCache.Asset asset, long size)
{
SafeFileHandle handle;
try
{
handle = System.IO.File.OpenHandle(asset.Path, FileMode.Open, FileAccess.Read, FileShare.Read);
}
catch
{
await NotFound(conn, asset.Path);
return;
}

try
{
int fd = (int)handle.DangerousGetHandle();
await SendFromDisk(conn, readers, asset, fd, size);
}
finally
{
handle.Dispose();
}
}

// Format the 200 header into a stack buffer (kept in a sync method - a Span can't cross an await).
private static void WriteHeader(Connection conn, AssetCache.Asset asset)
private static void WriteHeader(Connection conn, AssetCache.Asset asset, long bodyLength)
{
Span<byte> header = stackalloc byte[256];
conn.Write(header[..AssetCache.WriteResponseHeader(header, asset.Path, (int)asset.Length)]);
conn.Write(header[..AssetCache.WriteResponseHeader(header, asset.Path, (int)bodyLength)]);
}

// Copy native memory through the write slab in slab-sized chunks and flush each.
Expand Down
5 changes: 5 additions & 0 deletions Examples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ private readonly record struct Example(
"raw-pipes" => new Example(Configs.Shared, Raw.PipesExample.Handle, null),
"raw-incremental" => new Example(Configs.Incremental, Raw.IncrementalExample.Handle, null),

// Large-body plaintext, to exercise the response send path with big payloads. raw-zc flips on
// IORING_OP_SEND_ZC (ServerConfig.ZeroCopySend); raw-big is the plain-SEND baseline.
"raw-big" => new Example(Configs.Shared with { WriteSlabSize = 256 * 1024 }, Raw.BigExample.Handle, null),
"raw-zc" => new Example(Configs.Shared with { WriteSlabSize = 256 * 1024, ZeroCopySend = true }, Raw.BigExample.Handle, null),

"pg-shared" => WithPg(Configs.Shared, Pg.SharedExample.Handle),
"pg-pipes" => WithPg(Configs.Shared, Pg.PipesExample.Handle),
"pg-incremental" => WithPg(Configs.Incremental, Pg.IncrementalExample.Handle),
Expand Down
59 changes: 59 additions & 0 deletions Examples/Raw/BigExample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System.Text;
using ioxide;

namespace Examples.Raw;

/// <summary>
/// Large fixed plaintext body, to exercise the response send path with big payloads - in particular
/// the zero-copy send strategy (ServerConfig.ZeroCopySend). The body is a deterministic byte pattern
/// so a client can checksum it and compare plain SEND vs SEND_ZC output byte-for-byte. Under
/// keep-alive this also exercises slab reuse: the handler only writes the next response after
/// FlushAsync completes, which for a ZC send is the F_NOTIF (buffer-release) - so a correct
/// implementation never recycles the slab while the kernel still owns it.
/// </summary>
public static class BigExample
{
public const int BodySize = 100 * 1024; // 100 KB - well past the small-payload regime

// Built once: header + deterministic body (byte i = i % 251; 251 is prime, so the pattern isn't
// 256-aligned and a one-byte slip would show up in the checksum).
private static readonly byte[] Response = BuildResponse();

private static byte[] BuildResponse()
{
byte[] head = Encoding.ASCII.GetBytes(
$"HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: {BodySize}\r\n\r\n");

byte[] response = new byte[head.Length + BodySize];
head.CopyTo(response, 0);
for (int i = 0; i < BodySize; i++)
{
response[head.Length + i] = (byte)(i % 251);
}
return response;
}

public static async Task Handle(Reactor r, Connection conn)
{
while (true)
{
var snapshot = await conn.ReadAsync();

while (conn.TryGetItem(snapshot, out var item))
{
if (item.HasBuffer) conn.ReturnBuffer(in item);
}

conn.Write(Response);
await conn.FlushAsync(); // ZC: returns on the F_NOTIF, so the slab is free before the next Write

if (snapshot.IsClosed)
{
conn.DecRef();
return;
}

conn.ResetRead();
}
}
}
71 changes: 57 additions & 14 deletions Playground/Handlers.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Buffers.Text;
using System.IO.Pipelines;
using System.Text;
using Microsoft.Win32.SafeHandles;
using ioxide;
using ioxide.file;
using ioxide.pg;
Expand Down Expand Up @@ -176,15 +177,29 @@ public static async Task File(Reactor reactor, Connection conn)
conn.Write(NotFound);
await conn.FlushAsync();
}
else if (asset.Response != 0)
{
// Hot path: the whole response (header + body) is baked into the
// snapshot - no file I/O, no header formatting, no allocation.
await SendChunked(conn, asset.Response, asset.ResponseLength);
}
else
{
await SendFromDisk(conn, readers, asset);
// Revalidate against disk (size + mtime + inode). The baked response is the
// hot path only while the file is unchanged; an edit or atomic rename is
// served live instead, so RAM never goes stale. (Re-bakes on Reload().)
bool fresh = AssetCache.IsFresh(asset, out bool exists, out long size);
if (!exists)
{
conn.Write(NotFound); // vanished
await conn.FlushAsync();
}
else if (fresh && asset.Response != 0)
{
await SendChunked(conn, asset.Response, asset.ResponseLength); // baked hot path
}
else if (fresh)
{
await SendFromDisk(conn, readers, asset, asset.Fd, asset.Length); // large, unchanged
}
else
{
await SendChangedFromDisk(conn, readers, asset, size); // changed -> live
}
}
}

Expand All @@ -198,28 +213,29 @@ public static async Task File(Reactor reactor, Connection conn)
}
}

// Stream a large (non-baked) asset off the ring. Files bigger than the reader's buffer are read
// in successive chunks at advancing offsets, so they're served whole instead of truncated.
private static async Task SendFromDisk(Connection conn, RingPool<AssetReader> readers, AssetCache.Asset asset)
// Stream an asset off the ring from <paramref name="fd"/>, framing Content-Length from
// <paramref name="totalLength"/>. Files bigger than the reader's buffer are read in successive
// chunks at advancing offsets, so they're served whole instead of truncated.
private static async Task SendFromDisk(Connection conn, RingPool<AssetReader> readers, AssetCache.Asset asset, int fd, long totalLength)
{
AssetReader reader = await readers.RentAsync();
try
{
int first = await reader.ReadAsync(asset.Fd, offset: 0);
int first = await reader.ReadAsync(fd, offset: 0);
if (first < 0)
{
conn.Write(ServerError);
await conn.FlushAsync();
return;
}

WriteAssetHeader(conn, asset, (int)asset.Length); // full length up front
WriteAssetHeader(conn, asset, (int)totalLength); // full length up front
await SendChunked(conn, reader.Buffer, first);

long offset = first;
while (offset < asset.Length)
while (offset < totalLength)
{
int read = await reader.ReadAsync(asset.Fd, offset);
int read = await reader.ReadAsync(fd, offset);
if (read <= 0)
{
break; // EOF or mid-stream error; the response is already committed
Expand All @@ -234,6 +250,33 @@ private static async Task SendFromDisk(Connection conn, RingPool<AssetReader> re
}
}

// Serve a file whose on-disk version no longer matches the baked snapshot: open the current path
// fresh (so an atomic rename resolves to the new inode, not the cached fd) and stream it live.
private static async Task SendChangedFromDisk(Connection conn, RingPool<AssetReader> readers, AssetCache.Asset asset, long size)
{
SafeFileHandle handle;
try
{
handle = System.IO.File.OpenHandle(asset.Path, FileMode.Open, FileAccess.Read, FileShare.Read);
}
catch
{
conn.Write(NotFound);
await conn.FlushAsync();
return;
}

try
{
int fd = (int)handle.DangerousGetHandle();
await SendFromDisk(conn, readers, asset, fd, size);
}
finally
{
handle.Dispose();
}
}

// Drain the recv (the raw/pg handlers don't parse the request).
private static void Drain(Connection conn, RecvSnapshot snapshot)
{
Expand Down
Loading
Loading