diff --git a/Examples/Files/StaticExample.cs b/Examples/Files/StaticExample.cs index 9ba088f..880d4ce 100644 --- a/Examples/Files/StaticExample.cs +++ b/Examples/Files/StaticExample.cs @@ -1,13 +1,17 @@ using ioxide; using ioxide.file; +using Microsoft.Win32.SafeHandles; + namespace Examples.Files; /// -/// Static files over the per-reactor asset cache. A keeps the -/// snapshot alive for the whole request (so a concurrent reload can't free the fd mid-send). Small -/// files are served from a baked HTTP response with no I/O; larger files are read off the ring in -/// chunks. Misses are 404. +/// Static files over the shared asset cache. A keeps the snapshot +/// alive for the whole request (so a concurrent reload can't free the fd mid-send). Every lookup is +/// revalidated against disk (size + mtime + inode): an unchanged small file is served from its baked +/// HTTP response with no I/O; a file that was edited or atomically replaced is opened fresh and +/// streamed live, so the cache never serves stale bytes (it re-bakes on the next reload). Larger +/// files are read off the ring in chunks. Misses are 404. /// public static class StaticExample { @@ -29,16 +33,30 @@ public static async Task Handle(Reactor r, Connection conn) { if (!lease.TryGet(path, out AssetCache.Asset asset)) { - Http.WriteText(conn, 404, "Not Found", $"no asset {path}"); - await conn.FlushAsync(); - } - else if (asset.Response != 0) - { - await SendNative(conn, asset.Response, asset.ResponseLength); + await NotFound(conn, path); } else { - await SendFromDisk(conn, readers, asset); + // Revalidate the cached asset against disk. The baked response is the hot path + // only while the file is unchanged; an edit or atomic rename is served live, so + // RAM never goes stale. + bool fresh = AssetCache.IsFresh(asset, out bool exists, out long size); + if (!exists) + { + await NotFound(conn, path); // vanished + } + else if (fresh && asset.Response != 0) + { + await SendNative(conn, asset.Response, asset.ResponseLength); // baked hot path + } + else if (fresh) + { + await SendFromDisk(conn, readers, asset, asset.Fd, asset.Length); // large, unchanged + } + else + { + await SendChangedFromDisk(conn, readers, asset, size); // changed -> live + } } } @@ -56,14 +74,22 @@ public static async Task Handle(Reactor r, Connection conn) } } - // Read a large (non-baked) asset off the ring, in successive chunks for files bigger than the buffer. - private static async Task SendFromDisk(Connection conn, RingPool readers, AssetCache.Asset asset) + private static async Task NotFound(Connection conn, string path) + { + Http.WriteText(conn, 404, "Not Found", $"no asset {path}"); + await conn.FlushAsync(); + } + + // Stream an asset off the ring from , framing Content-Length from + // . Files bigger than the reader's buffer are read in successive + // chunks at advancing offsets, so they're served whole instead of truncated. + private static async Task SendFromDisk(Connection conn, RingPool readers, AssetCache.Asset asset, int fd, long totalLength) { AssetReader reader = await readers.RentAsync(); try { - int first = await reader.ReadAsync(asset.Fd, offset: 0); + int first = await reader.ReadAsync(fd, offset: 0); if (first < 0) { Http.WriteText(conn, 500, "Internal Server Error", "read failed"); @@ -71,13 +97,13 @@ private static async Task SendFromDisk(Connection conn, RingPool re return; } - WriteHeader(conn, asset); + WriteHeader(conn, asset, totalLength); await SendNative(conn, reader.Buffer, first); long offset = first; - while (offset < asset.Length) + while (offset < totalLength) { - int read = await reader.ReadAsync(asset.Fd, offset); + int read = await reader.ReadAsync(fd, offset); if (read <= 0) { break; @@ -93,11 +119,37 @@ private static async Task SendFromDisk(Connection conn, RingPool re } } + // Serve a file whose on-disk version no longer matches the baked snapshot: open the current path + // fresh (so an atomic rename resolves to the new inode, not the cached fd) and stream it live. + private static async Task SendChangedFromDisk(Connection conn, RingPool readers, AssetCache.Asset asset, long size) + { + SafeFileHandle handle; + try + { + handle = System.IO.File.OpenHandle(asset.Path, FileMode.Open, FileAccess.Read, FileShare.Read); + } + catch + { + await NotFound(conn, asset.Path); + return; + } + + try + { + int fd = (int)handle.DangerousGetHandle(); + await SendFromDisk(conn, readers, asset, fd, size); + } + finally + { + handle.Dispose(); + } + } + // Format the 200 header into a stack buffer (kept in a sync method - a Span can't cross an await). - private static void WriteHeader(Connection conn, AssetCache.Asset asset) + private static void WriteHeader(Connection conn, AssetCache.Asset asset, long bodyLength) { Span header = stackalloc byte[256]; - conn.Write(header[..AssetCache.WriteResponseHeader(header, asset.Path, (int)asset.Length)]); + conn.Write(header[..AssetCache.WriteResponseHeader(header, asset.Path, (int)bodyLength)]); } // Copy native memory through the write slab in slab-sized chunks and flush each. diff --git a/Examples/Program.cs b/Examples/Program.cs index 9337441..7fd1928 100644 --- a/Examples/Program.cs +++ b/Examples/Program.cs @@ -73,6 +73,11 @@ private readonly record struct Example( "raw-pipes" => new Example(Configs.Shared, Raw.PipesExample.Handle, null), "raw-incremental" => new Example(Configs.Incremental, Raw.IncrementalExample.Handle, null), + // Large-body plaintext, to exercise the response send path with big payloads. raw-zc flips on + // IORING_OP_SEND_ZC (ServerConfig.ZeroCopySend); raw-big is the plain-SEND baseline. + "raw-big" => new Example(Configs.Shared with { WriteSlabSize = 256 * 1024 }, Raw.BigExample.Handle, null), + "raw-zc" => new Example(Configs.Shared with { WriteSlabSize = 256 * 1024, ZeroCopySend = true }, Raw.BigExample.Handle, null), + "pg-shared" => WithPg(Configs.Shared, Pg.SharedExample.Handle), "pg-pipes" => WithPg(Configs.Shared, Pg.PipesExample.Handle), "pg-incremental" => WithPg(Configs.Incremental, Pg.IncrementalExample.Handle), diff --git a/Examples/Raw/BigExample.cs b/Examples/Raw/BigExample.cs new file mode 100644 index 0000000..82633fb --- /dev/null +++ b/Examples/Raw/BigExample.cs @@ -0,0 +1,59 @@ +using System.Text; +using ioxide; + +namespace Examples.Raw; + +/// +/// Large fixed plaintext body, to exercise the response send path with big payloads - in particular +/// the zero-copy send strategy (ServerConfig.ZeroCopySend). The body is a deterministic byte pattern +/// so a client can checksum it and compare plain SEND vs SEND_ZC output byte-for-byte. Under +/// keep-alive this also exercises slab reuse: the handler only writes the next response after +/// FlushAsync completes, which for a ZC send is the F_NOTIF (buffer-release) - so a correct +/// implementation never recycles the slab while the kernel still owns it. +/// +public static class BigExample +{ + public const int BodySize = 100 * 1024; // 100 KB - well past the small-payload regime + + // Built once: header + deterministic body (byte i = i % 251; 251 is prime, so the pattern isn't + // 256-aligned and a one-byte slip would show up in the checksum). + private static readonly byte[] Response = BuildResponse(); + + private static byte[] BuildResponse() + { + byte[] head = Encoding.ASCII.GetBytes( + $"HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: {BodySize}\r\n\r\n"); + + byte[] response = new byte[head.Length + BodySize]; + head.CopyTo(response, 0); + for (int i = 0; i < BodySize; i++) + { + response[head.Length + i] = (byte)(i % 251); + } + return response; + } + + public static async Task Handle(Reactor r, Connection conn) + { + while (true) + { + var snapshot = await conn.ReadAsync(); + + while (conn.TryGetItem(snapshot, out var item)) + { + if (item.HasBuffer) conn.ReturnBuffer(in item); + } + + conn.Write(Response); + await conn.FlushAsync(); // ZC: returns on the F_NOTIF, so the slab is free before the next Write + + if (snapshot.IsClosed) + { + conn.DecRef(); + return; + } + + conn.ResetRead(); + } + } +} diff --git a/Playground/Handlers.cs b/Playground/Handlers.cs index cd9ab81..17aa2bc 100644 --- a/Playground/Handlers.cs +++ b/Playground/Handlers.cs @@ -1,6 +1,7 @@ using System.Buffers.Text; using System.IO.Pipelines; using System.Text; +using Microsoft.Win32.SafeHandles; using ioxide; using ioxide.file; using ioxide.pg; @@ -176,15 +177,29 @@ public static async Task File(Reactor reactor, Connection conn) conn.Write(NotFound); await conn.FlushAsync(); } - else if (asset.Response != 0) - { - // Hot path: the whole response (header + body) is baked into the - // snapshot - no file I/O, no header formatting, no allocation. - await SendChunked(conn, asset.Response, asset.ResponseLength); - } else { - await SendFromDisk(conn, readers, asset); + // Revalidate against disk (size + mtime + inode). The baked response is the + // hot path only while the file is unchanged; an edit or atomic rename is + // served live instead, so RAM never goes stale. (Re-bakes on Reload().) + bool fresh = AssetCache.IsFresh(asset, out bool exists, out long size); + if (!exists) + { + conn.Write(NotFound); // vanished + await conn.FlushAsync(); + } + else if (fresh && asset.Response != 0) + { + await SendChunked(conn, asset.Response, asset.ResponseLength); // baked hot path + } + else if (fresh) + { + await SendFromDisk(conn, readers, asset, asset.Fd, asset.Length); // large, unchanged + } + else + { + await SendChangedFromDisk(conn, readers, asset, size); // changed -> live + } } } @@ -198,14 +213,15 @@ public static async Task File(Reactor reactor, Connection conn) } } - // Stream a large (non-baked) asset off the ring. Files bigger than the reader's buffer are read - // in successive chunks at advancing offsets, so they're served whole instead of truncated. - private static async Task SendFromDisk(Connection conn, RingPool readers, AssetCache.Asset asset) + // Stream an asset off the ring from , framing Content-Length from + // . Files bigger than the reader's buffer are read in successive + // chunks at advancing offsets, so they're served whole instead of truncated. + private static async Task SendFromDisk(Connection conn, RingPool readers, AssetCache.Asset asset, int fd, long totalLength) { AssetReader reader = await readers.RentAsync(); try { - int first = await reader.ReadAsync(asset.Fd, offset: 0); + int first = await reader.ReadAsync(fd, offset: 0); if (first < 0) { conn.Write(ServerError); @@ -213,13 +229,13 @@ private static async Task SendFromDisk(Connection conn, RingPool re return; } - WriteAssetHeader(conn, asset, (int)asset.Length); // full length up front + WriteAssetHeader(conn, asset, (int)totalLength); // full length up front await SendChunked(conn, reader.Buffer, first); long offset = first; - while (offset < asset.Length) + while (offset < totalLength) { - int read = await reader.ReadAsync(asset.Fd, offset); + int read = await reader.ReadAsync(fd, offset); if (read <= 0) { break; // EOF or mid-stream error; the response is already committed @@ -234,6 +250,33 @@ private static async Task SendFromDisk(Connection conn, RingPool re } } + // Serve a file whose on-disk version no longer matches the baked snapshot: open the current path + // fresh (so an atomic rename resolves to the new inode, not the cached fd) and stream it live. + private static async Task SendChangedFromDisk(Connection conn, RingPool readers, AssetCache.Asset asset, long size) + { + SafeFileHandle handle; + try + { + handle = System.IO.File.OpenHandle(asset.Path, FileMode.Open, FileAccess.Read, FileShare.Read); + } + catch + { + conn.Write(NotFound); + await conn.FlushAsync(); + return; + } + + try + { + int fd = (int)handle.DangerousGetHandle(); + await SendFromDisk(conn, readers, asset, fd, size); + } + finally + { + handle.Dispose(); + } + } + // Drain the recv (the raw/pg handlers don't parse the request). private static void Drain(Connection conn, RecvSnapshot snapshot) { diff --git a/ioxide.file/AssetCache.cs b/ioxide.file/AssetCache.cs index c6e8fff..cb160a9 100644 --- a/ioxide.file/AssetCache.cs +++ b/ioxide.file/AssetCache.cs @@ -22,7 +22,7 @@ public sealed class AssetCache : IDisposable /// precomputed in native memory ( bytes - send it as-is); /// otherwise read positionally off the ring and build the response. /// - public readonly record struct Asset(int Fd, string Path, long Length, nint Response, int ResponseLength); + public readonly record struct Asset(int Fd, string Path, long Length, nint Response, int ResponseLength, long MtimeSec, uint MtimeNsec, ulong Ino); private readonly Dictionary _assets; private readonly SafeFileHandle[] _handles; @@ -78,13 +78,65 @@ public AssetCache(string rootDir, int maxCachedFileBytes = DefaultMaxCachedFileB int fd = (int)handle.DangerousGetHandle(); string key = "/" + Path.GetRelativePath(RootDir, path).Replace('\\', '/'); - _assets[key] = new Asset(fd, path, length, response, responseLength); + // Freshness baseline: a request later re-statx's this path and serves the baked response + // only while size + mtime + inode still match. + TryStat(path, out _, out long mtimeSec, out uint mtimeNsec, out ulong ino); + + _assets[key] = new Asset(fd, path, length, response, responseLength, mtimeSec, mtimeNsec, ino); } _handles = handles.ToArray(); _responses = responses.ToArray(); } + // --- Revalidation (statx) ------------------------------------------------------------------- + // A baked response is served only while the file on disk still matches what was baked (size + + // mtime + inode), so an in-place edit or an atomic rename is picked up live instead of serving + // stale RAM. The baked block is never mutated here (the cache is shared across reactors) - it + // only re-bakes on Reload(). + + private const int AT_FDCWD = -100; + private const uint STATX_BASIC_STATS = 0x000007ffU; + + [DllImport("libc", EntryPoint = "statx", SetLastError = true)] + private static extern unsafe int statx(int dirfd, [MarshalAs(UnmanagedType.LPUTF8Str)] string path, int flags, uint mask, byte* buf); + + /// + /// statx a path into (size, mtime, inode). False when the file can't be stat'd (e.g. deleted). + /// Offsets are from struct statx (linux/stat.h): ino @32, size @40, mtime.sec @112, + /// mtime.nsec @120. + /// + internal static unsafe bool TryStat(string path, out long size, out long mtimeSec, out uint mtimeNsec, out ulong ino) + { + byte* buf = stackalloc byte[256]; + if (statx(AT_FDCWD, path, 0, STATX_BASIC_STATS, buf) != 0) + { + size = 0; mtimeSec = 0; mtimeNsec = 0; ino = 0; + return false; + } + + ino = *(ulong*)(buf + 32); + size = (long)*(ulong*)(buf + 40); + mtimeSec = *(long*)(buf + 112); + mtimeNsec = *(uint*)(buf + 120); + return true; + } + + /// + /// True when still matches the file on disk (size + mtime + inode). + /// is false when the file is gone; is + /// the live size, used to frame the response when serving a changed file. + /// + public static bool IsFresh(in Asset asset, out bool exists, out long currentSize) + { + exists = TryStat(asset.Path, out currentSize, out long ms, out uint mn, out ulong ino); + return exists + && currentSize == asset.Length + && ms == asset.MtimeSec + && mn == asset.MtimeNsec + && ino == asset.Ino; + } + // Bake "HTTP/1.1 200 OK ..." + body into one contiguous native block. The body is read first // so a file truncated under us still yields a consistent Content-Length. private static unsafe (nint Response, int Length) BuildResponse(SafeFileHandle handle, int bodyLength, string path) diff --git a/ioxide.file/ioxide.file.csproj b/ioxide.file/ioxide.file.csproj index d79c828..1da27ab 100644 --- a/ioxide.file/ioxide.file.csproj +++ b/ioxide.file/ioxide.file.csproj @@ -8,7 +8,7 @@ ioxide.file ioxide.file - 0.0.7 + 0.0.8 MDA2AV File serving for the ioxide io_uring runtime: immutable asset snapshots with baked responses, pooled positional ring reads, atomic reloads. MIT diff --git a/ioxide.pg/ioxide.pg.csproj b/ioxide.pg/ioxide.pg.csproj index 159f014..4c6bf6a 100644 --- a/ioxide.pg/ioxide.pg.csproj +++ b/ioxide.pg/ioxide.pg.csproj @@ -8,7 +8,7 @@ ioxide.pg ioxide.pg - 0.0.7 + 0.0.8 MDA2AV Postgres driver for the ioxide io_uring runtime: pooled ring-native connections per reactor, ring-native connect and handshake, inline completion resume. MIT diff --git a/ioxide.redis/ioxide.redis.csproj b/ioxide.redis/ioxide.redis.csproj index 93cfb95..63e76c9 100644 --- a/ioxide.redis/ioxide.redis.csproj +++ b/ioxide.redis/ioxide.redis.csproj @@ -8,7 +8,7 @@ ioxide.redis ioxide.redis - 0.0.7 + 0.0.8 MDA2AV Redis client for the ioxide io_uring runtime: pooled ring-native connections per reactor, full RESP2 protocol, a generic command API plus typed helpers (strings, keys, hashes, lists, sets, sorted sets, pub/sub, transactions, scripting), and pipelining. Inline completion resume. MIT diff --git a/ioxide.tls/ioxide.tls.csproj b/ioxide.tls/ioxide.tls.csproj index e8c58db..d767738 100644 --- a/ioxide.tls/ioxide.tls.csproj +++ b/ioxide.tls/ioxide.tls.csproj @@ -8,7 +8,7 @@ ioxide.tls ioxide.tls - 0.0.7 + 0.0.8 MDA2AV TLS for the ioxide io_uring runtime: OpenSSL handshake driven over the ring, then kernel TLS (kTLS) transmit offload - handlers keep writing plaintext through the same connection API. Requires Linux kTLS (tls module) and OpenSSL 3. MIT diff --git a/ioxide/Connection/Connection.Write.cs b/ioxide/Connection/Connection.Write.cs index cb1daa9..25b9692 100644 --- a/ioxide/Connection/Connection.Write.cs +++ b/ioxide/Connection/Connection.Write.cs @@ -14,6 +14,11 @@ public sealed unsafe partial class Connection : IValueTaskSource, IBufferWriter< internal int WriteTail; internal int WriteInFlight; + // Outstanding IORING_CQE_F_NOTIF completions for in-flight zero-copy sends. The slab can't be + // recycled until this hits zero (the kernel still owns the buffer until the notif). Always 0 for + // plain SEND; reset on Clear(). + internal int ZcNotifPending; + private readonly UnmanagedMemoryManager _manager; private ManualResetValueTaskSourceCore _flushSignal = new() @@ -138,6 +143,7 @@ internal void CompleteFlush() WriteHead = 0; WriteTail = 0; WriteInFlight = 0; + ZcNotifPending = 0; Volatile.Write(ref _flushInProgress, 0); Interlocked.Exchange(ref _flushArmed, 0); diff --git a/ioxide/Connection/Connection.cs b/ioxide/Connection/Connection.cs index 6d3716d..70f12a1 100644 --- a/ioxide/Connection/Connection.cs +++ b/ioxide/Connection/Connection.cs @@ -12,12 +12,34 @@ public sealed unsafe partial class Connection /// The listener port this connection was accepted on; set per accept. public ushort ListenerPort { get; internal set; } + /// + /// The reactor's injected send strategy for this connection (plain SEND or SEND_ZC), bound at + /// accept from . kTLS forces it back to plain via the + /// setter. + /// + internal delegate* SendFn; + + private uint _sendOpFlags = 0x100; // MSG_WAITALL + /// /// op_flags for sends on this connection. Defaults to MSG_WAITALL (the kernel coalesces short /// sends into one CQE). kTLS rejects MSG_WAITALL, so ioxide.tls clears this after the - /// handoff; the reactor's partial-send loop preserves correctness either way. + /// handoff; the reactor's partial-send loop preserves correctness either way. Clearing it (the + /// kTLS marker) also pins this connection to plain SEND - SEND_ZC through the TLS ULP gives no + /// benefit and may be rejected. /// - public uint SendOpFlags { get; set; } = 0x100; // MSG_WAITALL + public uint SendOpFlags + { + get => _sendOpFlags; + set + { + _sendOpFlags = value; + if (value == 0) + { + SendFn = &Reactor.SubmitSendPlain; + } + } + } // Bumped on Clear(); the low 16 bits serve as the IVTS token so stale awaiters // from a previous pool life are detectable. @@ -84,6 +106,7 @@ internal void Clear() WriteHead = 0; WriteTail = 0; WriteInFlight = 0; + ZcNotifPending = 0; _readSignal.Reset(); _flushSignal.Reset(); diff --git a/ioxide/Reactor/Reactor.Incremental.cs b/ioxide/Reactor/Reactor.Incremental.cs index 650224b..93efc51 100644 --- a/ioxide/Reactor/Reactor.Incremental.cs +++ b/ioxide/Reactor/Reactor.Incremental.cs @@ -241,7 +241,7 @@ private void DispatchIncremental(in IoUringCqe cqe) } case KindSend: - OnSendCompletion(fd, gen, cqe.res); + OnSendCompletion(fd, gen, cqe.res, cqe.flags); return; case KindClient: diff --git a/ioxide/Reactor/Reactor.cs b/ioxide/Reactor/Reactor.cs index 38446d9..55dc37b 100644 --- a/ioxide/Reactor/Reactor.cs +++ b/ioxide/Reactor/Reactor.cs @@ -23,6 +23,10 @@ public sealed unsafe partial class Reactor private int[] _listenFds = []; private ushort[] _listenPorts = []; private readonly ServerConfig _config; + + // The response-send strategy, chosen once from config (ZeroCopySend) and injected per-connection + // at accept via Connection.SendFn. Keeps the send hot path free of a per-call branch. + private readonly delegate* _sendFn; private readonly ushort _port; private readonly uint _ringEntries; private readonly bool _incremental; @@ -99,6 +103,7 @@ public Reactor(int id, ServerConfig config) ConnBufRingEntries = config.ConnBufRingEntries; IncRecvBufferSize = (uint)config.IncRecvBufferSize; _pool = new Stack(config.PoolMax); + _sendFn = config.ZeroCopySend ? &SubmitSendZc : &SubmitSendPlain; } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -209,7 +214,7 @@ internal void EnqueueFlush(int fd, int gen) Connection? conn = ConnAt(fd, (ushort)gen); if (conn != null) { - SubmitSend(fd, (ushort)gen, conn.WriteBuffer, (uint)conn.WriteInFlight, conn.SendOpFlags); + conn.SendFn(this, fd, (ushort)gen, conn.WriteBuffer, (uint)conn.WriteInFlight, conn.SendOpFlags); } return; } @@ -267,7 +272,7 @@ private void DrainFlushQ() { continue; } - SubmitSend(fd, gen, conn.WriteBuffer, (uint)conn.WriteInFlight, conn.SendOpFlags); + conn.SendFn(this, fd, gen, conn.WriteBuffer, (uint)conn.WriteInFlight, conn.SendOpFlags); } } @@ -448,7 +453,7 @@ private void Dispatch(in IoUringCqe cqe) } case KindSend: - OnSendCompletion(fd, gen, cqe.res); + OnSendCompletion(fd, gen, cqe.res, cqe.flags); return; case KindClient: @@ -466,6 +471,7 @@ private void Dispatch(in IoUringCqe cqe) : new Connection(this, clientFd, _config.WriteSlabSize, _config.RecvQueueEntries); Track(clientFd, conn); conn.InitRefs(); + conn.SendFn = _sendFn; // config default; kTLS overrides to plain on handshake conn.ListenerPort = PortOf(fd); SubmitRecvMultishot(clientFd, (ushort)conn.Generation, BgId); @@ -495,14 +501,27 @@ private void Dispatch(in IoUringCqe cqe) } } - // Shared by both loops. - private void OnSendCompletion(int fd, ushort gen, int res) + // Shared by both loops. Handles plain SEND (one CQE) and SEND_ZC (a data CQE carrying + // IORING_CQE_F_MORE, then a separate IORING_CQE_F_NOTIF once the kernel releases the slab). + private void OnSendCompletion(int fd, ushort gen, int res, uint cqeFlags) { Connection? conn = ConnAt(fd, gen); if (conn == null) { return; // stale CQE - never touch the fd's new tenant } + + // Zero-copy buffer-release notification: the kernel is done with the slab. Recycle once the + // data is fully sent and no further notifs are outstanding. + if ((cqeFlags & IORING_CQE_F_NOTIF) != 0) + { + if (--conn.ZcNotifPending == 0 && conn.WriteHead >= conn.WriteInFlight) + { + conn.CompleteFlush(); + } + return; + } + if (res <= 0) { _connections[fd] = null; @@ -512,13 +531,26 @@ private void OnSendCompletion(int fd, ushort gen, int res) return; } conn.WriteHead += res; + + // A zero-copy send posts its data CQE with F_MORE and a notif will follow; hold the slab until + // that notif arrives. Plain SEND never sets F_MORE, so this is a no-op for it. + if ((cqeFlags & IORING_CQE_F_MORE) != 0) + { + conn.ZcNotifPending++; + } + if (conn.WriteHead < conn.WriteInFlight) { - // Partial send (rare with MSG_WAITALL): resubmit the remainder. - SubmitSend(fd, gen, conn.WriteBuffer + conn.WriteHead, (uint)(conn.WriteInFlight - conn.WriteHead), conn.SendOpFlags); + // Partial send (rare with MSG_WAITALL): resubmit the remainder via the injected sender. + conn.SendFn(this, fd, gen, conn.WriteBuffer + conn.WriteHead, (uint)(conn.WriteInFlight - conn.WriteHead), conn.SendOpFlags); return; } - conn.CompleteFlush(); + + // Data fully sent: plain SEND recycles now; a ZC send waits for its outstanding notif(s). + if (conn.ZcNotifPending == 0) + { + conn.CompleteFlush(); + } } private void OnWakeCompletion(bool more) @@ -627,11 +659,20 @@ private void SubmitRecvMultishot(int fd, ushort gen, ushort bgid) sqe->user_data = Tag(KindRecv, gen, fd); } - private void SubmitSend(int fd, ushort gen, byte* buf, uint len, uint opFlags) + // The two injectable send strategies (see _sendFn / Connection.SendFn). Static so they can be + // taken as function pointers; the chosen one is bound per-connection at accept. + internal static void SubmitSendPlain(Reactor r, int fd, ushort gen, byte* buf, uint len, uint opFlags) + => SubmitSendImpl(r, IORING_OP_SEND, fd, gen, buf, len, opFlags); + + internal static void SubmitSendZc(Reactor r, int fd, ushort gen, byte* buf, uint len, uint opFlags) + => SubmitSendImpl(r, IORING_OP_SEND_ZC, fd, gen, buf, len, opFlags); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static void SubmitSendImpl(Reactor r, byte opcode, int fd, ushort gen, byte* buf, uint len, uint opFlags) { - IoUringSqe* sqe = GetSqeOrFlush(); + IoUringSqe* sqe = r.GetSqeOrFlush(); Unsafe.InitBlockUnaligned(sqe, 0, 64); - sqe->opcode = IORING_OP_SEND; + sqe->opcode = opcode; sqe->fd = fd; sqe->addr = (ulong)buf; sqe->len = len; diff --git a/ioxide/ServerConfig.cs b/ioxide/ServerConfig.cs index 78da816..5d97765 100644 --- a/ioxide/ServerConfig.cs +++ b/ioxide/ServerConfig.cs @@ -27,6 +27,13 @@ public sealed record ServerConfig public int WriteSlabSize { get; init; } = 16 * 1024; public int PoolMax { get; init; } = 1024; + // Inject IORING_OP_SEND_ZC (zero-copy send) for the response path instead of IORING_OP_SEND. + // Trades the in-kernel payload copy for page-pinning plus a second (F_NOTIF) completion per send, + // so it only pays off for large responses - leave off for small-payload workloads. The sender is + // chosen once per connection at accept; kTLS connections always fall back to plain SEND (the + // kernel re-buffers to encrypt, so zero-copy buys nothing there). + public bool ZeroCopySend { get; init; } = false; + // Per-connection SPSC recv queue depth (power of two); overflow closes the connection. public int RecvQueueEntries { get; init; } = 64; diff --git a/ioxide/io_uring/Native.cs b/ioxide/io_uring/Native.cs index 63043a3..e4652c0 100644 --- a/ioxide/io_uring/Native.cs +++ b/ioxide/io_uring/Native.cs @@ -21,6 +21,11 @@ public static unsafe class Native { public const byte IORING_OP_WRITE = 23; public const byte IORING_OP_SEND = 26; public const byte IORING_OP_RECV = 27; + + // Zero-copy send (kernel 6.0+): the NIC DMAs straight from the user buffer instead of the kernel + // copying into skbuff. Posts the usual completion (with IORING_CQE_F_MORE set) and then a second + // CQE (IORING_CQE_F_NOTIF) once the buffer can be reused. + public const byte IORING_OP_SEND_ZC = 47; public const uint IORING_ENTER_GETEVENTS = 1u << 0; public const long IORING_OFF_SQ_RING = 0; public const long IORING_OFF_SQES = 0x10000000; @@ -36,6 +41,7 @@ public static unsafe class Native { public const byte IOSQE_BUFFER_SELECT = 1 << 5; public const uint IORING_CQE_F_BUFFER = 1u << 0; public const uint IORING_CQE_F_MORE = 1u << 1; + public const uint IORING_CQE_F_NOTIF = 1u << 3; // zero-copy send: buffer is free to reuse public const int IORING_CQE_BUFFER_SHIFT = 16; public const uint IORING_REGISTER_PBUF_RING = 22; public const uint IORING_UNREGISTER_PBUF_RING = 23; diff --git a/ioxide/ioxide.csproj b/ioxide/ioxide.csproj index d506262..e9fe362 100644 --- a/ioxide/ioxide.csproj +++ b/ioxide/ioxide.csproj @@ -8,7 +8,7 @@ ioxide ioxide - 0.0.7 + 0.0.8 MDA2AV A shared-nothing io_uring runtime for .NET: one ring per reactor thread, inline completions, zero native dependencies. The engine - reactor, connection, and the IRingHost client seam. MIT