diff --git a/frameworks/ioxide/Dockerfile b/frameworks/ioxide/Dockerfile index 1feec6d53..55f27c8b4 100644 --- a/frameworks/ioxide/Dockerfile +++ b/frameworks/ioxide/Dockerfile @@ -1,4 +1,4 @@ -FROM mcr.microsoft.com/dotnet/sdk:10.0 AS build +FROM mcr.microsoft.com/dotnet/sdk:11.0-preview AS build WORKDIR /source COPY ioxide-arena.csproj ./ RUN dotnet restore @@ -8,7 +8,7 @@ RUN dotnet publish -c Release --no-self-contained -o /app/out # ioxide drives io_uring through direct libc syscalls (no liburing). The bench # harness runs containers with --security-opt seccomp=unconfined (required for # io_uring_setup/enter); engine="io_uring" makes validate.sh enable it too. -FROM mcr.microsoft.com/dotnet/runtime:10.0 +FROM mcr.microsoft.com/dotnet/runtime:11.0-preview WORKDIR /app COPY --from=build /app/out ./ EXPOSE 8080 8081 diff --git a/frameworks/ioxide/Handler.cs b/frameworks/ioxide/Handler.cs new file mode 100644 index 000000000..10b151437 --- /dev/null +++ b/frameworks/ioxide/Handler.cs @@ -0,0 +1,224 @@ +using ioxide; +using ioxide.file; +using ioxide.pg; +using ioxide.tls; +using ioxide.utils; + +namespace IoxideArena; + +internal static class Handler +{ + private static int _slab = 16 * 1024; + private static Dataset _dataSet = Dataset.Empty; + private static StaticAssets? _staticAssets; + private static Precompressed? _precompressed; + private static bool _hasPg; + private static bool _hasTls; + private static bool _hasCache; + + public static void Init(ServerConfig config, Dataset ds, StaticAssets? assets, Precompressed? precompressed, bool hasPg, bool hasTls, bool hasCache) + { + _slab = config.WriteSlabSize; + _dataSet = ds; + _staticAssets = assets; + _precompressed = precompressed; + _hasPg = hasPg; + _hasTls = hasTls; + _hasCache = hasCache; + } + + public static async Task HandleAsync(Reactor reactor, Connection conn) + { + var httpSession = new HttpSession(_dataSet, _staticAssets, _precompressed); + PgPool? pool = _hasPg ? reactor.GetService() : null; + ICrudCache? cache = _hasCache ? reactor.GetService() : null; + PgRowHandler rowSink = httpSession.AppendDbRow; // async-db rows + PgRowHandler listSink = httpSession.AppendCrudRow; // crud list rows + PgRowHandler itemSink = httpSession.CaptureCrudItem; // crud single item + TlsSession? tls = null; + + try + { + if (_hasTls && conn.ListenerPort == 8081) + { + // Handshake over the ring, then kTLS TX: outbound writes below are + // plaintext and the kernel produces the records. Inbound stays + // userspace: each slice decrypts through the session. The client's + // first request can ride in with its Finished, so feed it here - + // the send-first loop below answers it before blocking on a read. + tls = await reactor.GetService().AcceptAsync(conn); + httpSession.Feed(tls.DrainPlaintext()); + } + + // Send-first: respond to whatever is already parsed (a request bundled + // with the TLS handshake, or a prior read) before parking on the next + // read. A read-first loop would deadlock on the bundled-request case. + while (true) + { + // /async-db parks the parser: run the query (inline on this reactor's + // ring via ioxide.pg), stream rows into Out, then resume the carry - + // pipelined requests behind it are served in order. + while (httpSession.PendingDb) + { + httpSession.PendingDb = false; + if (pool != null) + { + httpSession.BeginDbResponse(); + await pool.QueryRowsAsync(httpSession.PendingDbSql(), rowSink); + httpSession.EndDbResponse(); + } + else + { + httpSession.WriteDbUnavailable(); + } + + if (httpSession.PendingDbClose) httpSession.WantClose = true; + else httpSession.ResumeFeed(); + } + + while (httpSession.PendingCrud != CrudKind.None) + { + CrudKind kind = httpSession.PendingCrud; + httpSession.PendingCrud = CrudKind.None; + + if (pool == null) + { + httpSession.WriteCrudUnavailable(); + } + else switch (kind) + { + case CrudKind.List: + httpSession.BeginCrudList(); + await httpSession.SubmitCrudList(pool, listSink); + httpSession.EndCrudList(); + break; + + case CrudKind.GetOne: + string key = httpSession.CacheKey(); + string? cached = cache != null ? await cache.GetAsync(key) : null; + if (cached != null) + { + httpSession.WriteCrudItemResponse(System.Text.Encoding.UTF8.GetBytes(cached), cacheHit: true); + } + else + { + httpSession.ResetCrudItem(); + await httpSession.SubmitCrudItem(pool, itemSink); + if (httpSession.CrudItemFound) + { + if (cache != null) + await cache.SetExAsync(key, System.Text.Encoding.UTF8.GetString(httpSession.CrudItemBody()), 1); + httpSession.WriteCrudItemResponse(httpSession.CrudItemBody(), cacheHit: false); + } + else + { + httpSession.WriteCrud404(); + } + } + break; + + case CrudKind.Create: + await httpSession.SubmitCrudInsert(pool); + httpSession.WriteCrudStatus("HTTP/1.1 201 Created\r\nContent-Length: 0\r\n"u8); + break; + + case CrudKind.Update: + await httpSession.SubmitCrudUpdate(pool); + if (cache != null) await cache.DelAsync(httpSession.CacheKey()); + httpSession.WriteCrudStatus("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n"u8); + break; + } + + if (httpSession.PendingCrudClose) httpSession.WantClose = true; + else httpSession.ResumeFeed(); + } + + // Baked static responses go straight to the wire (not through Out) - no extra copy, + // and Out never grows to the largest asset, so per-connection memory stays flat + // under load. Sent before Out, which preserves order (Direct is only set when it was + // the first response of the batch). + if (httpSession.HasDirect) + { + int dsent = 0; + while (dsent < httpSession.DirectLen) + { + int dchunk = Math.Min(httpSession.DirectLen - dsent, _slab); + WriteDirect(conn, httpSession, dsent, dchunk); + await conn.FlushAsync(); + dsent += dchunk; + } + httpSession.ClearDirect(); + } + + int sent = 0; + while (sent < httpSession.OutLen) + { + int chunk = Math.Min(httpSession.OutLen - sent, _slab); + conn.Write(httpSession.Out.AsSpan(sent, chunk)); + await conn.FlushAsync(); + sent += chunk; + } + httpSession.OutLen = 0; + + if (httpSession.WantClose || (tls?.Closed ?? false)) + return; + + RecvSnapshot snap = await conn.ReadAsync(); + FeedSlices(httpSession, conn, tls, snap); + if (snap.IsClosed) + { + httpSession.WantClose = true; + } + else + { + conn.ResetRead(); + } + } + } + catch (Exception ex) + { + Console.Error.WriteLine($"[r{reactor.Id}] http handler crash fd={conn.ClientFd}: {ex}"); + } + finally + { + tls?.Dispose(); + conn.DecRef(); + } + } + + // Copy one slab-sized slice of the direct (baked static) response into the connection's write + // slab - managed precompressed buffer or native identity response. Kept in a sync unsafe helper + // so the native pointer never crosses an await in the async handler. + private static unsafe void WriteDirect(Connection conn, HttpSession s, int off, int len) + { + if (s.DirectBytes != null) + { + conn.Write(s.DirectBytes.AsSpan(off, len)); + } + else + { + conn.Write(new ReadOnlySpan((void*)(s.DirectPtr + off), len)); + } + } + + private static unsafe void FeedSlices(HttpSession s, Connection conn, TlsSession? tls, in RecvSnapshot snap) + { + while (conn.TryGetItem(snap, out SpscRecvRing.Item item)) + { + if (!item.HasBuffer) + { + continue; + } + if (tls != null) + { + s.Feed(tls.Decrypt(item.Ptr, item.Len)); + } + else + { + s.Feed(item.AsSpan()); + } + + conn.ReturnBuffer(in item); + } + } +} \ No newline at end of file diff --git a/frameworks/ioxide/HttpSession.cs b/frameworks/ioxide/HttpSession.cs index 658af4c3f..c442db2db 100644 --- a/frameworks/ioxide/HttpSession.cs +++ b/frameworks/ioxide/HttpSession.cs @@ -66,10 +66,16 @@ public void Feed(ReadOnlySpan data) { int take = (int)Math.Min(PendingUploadRemaining, (long)data.Length); PendingUploadRemaining -= take; - if (PendingUploadRemaining > 0) return; // more body still to come; nothing buffered + if (PendingUploadRemaining > 0) + { + return; // more body still to come; nothing buffered + } FinishUpload(); // last byte counted - write the byte-count response data = data[take..]; // any remainder is the start of the next request - if (data.IsEmpty) return; + if (data.IsEmpty) + { + return; + } } AppendCarry(data); Pump(); @@ -81,7 +87,11 @@ private void FinishUpload() Span num = stackalloc byte[20]; Utf8Formatter.TryFormat(_uploadTotal, num, out int n); WriteResp(num[..n], _uploadClose); - if (_uploadClose) WantClose = true; + + if (_uploadClose) + { + WantClose = true; + } } private void Pump() @@ -91,12 +101,18 @@ private void Pump() && TryOne(_carry.AsSpan(pos, _carryLen - pos), out int consumed, out bool close)) { pos += consumed; - if (close && !PendingDb) { WantClose = true; break; } + if (close && !PendingDb) + { + WantClose = true; break; + } } if (pos > 0) { int rem = _carryLen - pos; - if (rem > 0) Array.Copy(_carry, pos, _carry, 0, rem); + if (rem > 0) + { + Array.Copy(_carry, pos, _carry, 0, rem); + } _carryLen = rem; } } diff --git a/frameworks/ioxide/Program.cs b/frameworks/ioxide/Program.cs index 6ef37e13f..b9e4ecbd9 100644 --- a/frameworks/ioxide/Program.cs +++ b/frameworks/ioxide/Program.cs @@ -70,6 +70,8 @@ private static int Main() Incremental = false, RecvBufferSize = recvKb * 1024, BufferRingEntries = ringEntries, + // 128 KB so a static response fits one slab and the handler sends it without chunk-flushing. + WriteSlabSize = 128 * 1024, }; var dsPath = Environment.GetEnvironmentVariable("IOXIDE_DATASET") ?? "/data/dataset.json"; @@ -146,30 +148,37 @@ private static int Main() var reactor = new Reactor(i, config); var pgOptions = pg; var redisOptions = redis; - reactor.OnStart = rr => + + reactor.OnStart = reactorInstance => { if (pgOptions != null) { - PgPool.Start(rr, pgOptions); + PgPool.Start(reactorInstance, pgOptions); + // Crud cache, shared across reactors (inproc/stackexchange) or per-reactor (ioxide). ICrudCache cache = cacheBackend switch { - "ioxide" => new IoxideRedisCache(RedisPool.Start(rr, redisOptions!)), + "ioxide" => new IoxideRedisCache(RedisPool.Start(reactorInstance, redisOptions!)), "stackexchange" => new StackExchangeCache(mux!.GetDatabase()), _ => new InProcCache(memCache!), }; - rr.AddService(cache); + reactorInstance.AddService(cache); } if (tls) { - TlsService.Start(rr, new TlsOptions { CertificatePath = certPath, KeyPath = keyPath }); + TlsService.Start(reactorInstance, new TlsOptions { CertificatePath = certPath, KeyPath = keyPath }); } }; + reactor.Handle = Handler.HandleAsync; threads[i] = new Thread(reactor.Run) { Name = $"reactor-{i}", IsBackground = false }; threads[i].Start(); } - foreach (var t in threads) t.Join(); + foreach (var t in threads) + { + t.Join(); + } + return 0; } @@ -180,218 +189,11 @@ private static string ResolveIPv4(string host) foreach (var addr in Dns.GetHostAddresses(host)) { if (addr.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork) - return addr.ToString(); - } - return "127.0.0.1"; - } -} - -internal static class Handler -{ - private static int _slab = 16 * 1024; - private static Dataset _ds = Dataset.Empty; - private static StaticAssets? _assets; - private static Precompressed? _precompressed; - private static bool _hasPg; - private static bool _hasTls; - private static bool _hasCache; - - public static void Init(ServerConfig config, Dataset ds, StaticAssets? assets, Precompressed? precompressed, bool hasPg, bool hasTls, bool hasCache) - { - _slab = config.WriteSlabSize; - _ds = ds; - _assets = assets; - _precompressed = precompressed; - _hasPg = hasPg; - _hasTls = hasTls; - _hasCache = hasCache; - } - - public static async Task HandleAsync(Reactor reactor, Connection conn) - { - var s = new HttpSession(_ds, _assets, _precompressed); - PgPool? pool = _hasPg ? reactor.GetService() : null; - ICrudCache? cache = _hasCache ? reactor.GetService() : null; - PgRowHandler rowSink = s.AppendDbRow; // async-db rows - PgRowHandler listSink = s.AppendCrudRow; // crud list rows - PgRowHandler itemSink = s.CaptureCrudItem; // crud single item - TlsSession? tls = null; - - try - { - if (_hasTls && conn.ListenerPort == 8081) - { - // Handshake over the ring, then kTLS TX: outbound writes below are - // plaintext and the kernel produces the records. Inbound stays - // userspace: each slice decrypts through the session. The client's - // first request can ride in with its Finished, so feed it here - - // the send-first loop below answers it before blocking on a read. - tls = await reactor.GetService().AcceptAsync(conn); - s.Feed(tls.DrainPlaintext()); - } - - // Send-first: respond to whatever is already parsed (a request bundled - // with the TLS handshake, or a prior read) before parking on the next - // read. A read-first loop would deadlock on the bundled-request case. - while (true) { - // /async-db parks the parser: run the query (inline on this reactor's - // ring via ioxide.pg), stream rows into Out, then resume the carry - - // pipelined requests behind it are served in order. - while (s.PendingDb) - { - s.PendingDb = false; - if (pool != null) - { - s.BeginDbResponse(); - await pool.QueryRowsAsync(s.PendingDbSql(), rowSink); - s.EndDbResponse(); - } - else - { - s.WriteDbUnavailable(); - } - - if (s.PendingDbClose) s.WantClose = true; - else s.ResumeFeed(); - } - - while (s.PendingCrud != CrudKind.None) - { - CrudKind kind = s.PendingCrud; - s.PendingCrud = CrudKind.None; - - if (pool == null) - { - s.WriteCrudUnavailable(); - } - else switch (kind) - { - case CrudKind.List: - s.BeginCrudList(); - await s.SubmitCrudList(pool, listSink); - s.EndCrudList(); - break; - - case CrudKind.GetOne: - string key = s.CacheKey(); - string? cached = cache != null ? await cache.GetAsync(key) : null; - if (cached != null) - { - s.WriteCrudItemResponse(System.Text.Encoding.UTF8.GetBytes(cached), cacheHit: true); - } - else - { - s.ResetCrudItem(); - await s.SubmitCrudItem(pool, itemSink); - if (s.CrudItemFound) - { - if (cache != null) - await cache.SetExAsync(key, System.Text.Encoding.UTF8.GetString(s.CrudItemBody()), 1); - s.WriteCrudItemResponse(s.CrudItemBody(), cacheHit: false); - } - else - { - s.WriteCrud404(); - } - } - break; - - case CrudKind.Create: - await s.SubmitCrudInsert(pool); - s.WriteCrudStatus("HTTP/1.1 201 Created\r\nContent-Length: 0\r\n"u8); - break; - - case CrudKind.Update: - await s.SubmitCrudUpdate(pool); - if (cache != null) await cache.DelAsync(s.CacheKey()); - s.WriteCrudStatus("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n"u8); - break; - } - - if (s.PendingCrudClose) s.WantClose = true; - else s.ResumeFeed(); - } - - // Baked static responses go straight to the wire (not through Out) - no extra copy, - // and Out never grows to the largest asset, so per-connection memory stays flat - // under load. Sent before Out, which preserves order (Direct is only set when it was - // the first response of the batch). - if (s.HasDirect) - { - int dsent = 0; - while (dsent < s.DirectLen) - { - int dchunk = Math.Min(s.DirectLen - dsent, _slab); - WriteDirect(conn, s, dsent, dchunk); - await conn.FlushAsync(); - dsent += dchunk; - } - s.ClearDirect(); - } - - int sent = 0; - while (sent < s.OutLen) - { - int chunk = Math.Min(s.OutLen - sent, _slab); - conn.Write(s.Out.AsSpan(sent, chunk)); - await conn.FlushAsync(); - sent += chunk; - } - s.OutLen = 0; - - if (s.WantClose || (tls?.Closed ?? false)) - return; - - RecvSnapshot snap = await conn.ReadAsync(); - FeedSlices(s, conn, tls, snap); - if (snap.IsClosed) - { - s.WantClose = true; - } - else - { - conn.ResetRead(); - } - } - } - catch (Exception ex) - { - Console.Error.WriteLine($"[r{reactor.Id}] http handler crash fd={conn.ClientFd}: {ex}"); - } - finally - { - tls?.Dispose(); - conn.DecRef(); - } - } - - // Copy one slab-sized slice of the direct (baked static) response into the connection's write - // slab - managed precompressed buffer or native identity response. Kept in a sync unsafe helper - // so the native pointer never crosses an await in the async handler. - private static unsafe void WriteDirect(Connection conn, HttpSession s, int off, int len) - { - if (s.DirectBytes != null) conn.Write(s.DirectBytes.AsSpan(off, len)); - else conn.Write(new ReadOnlySpan((void*)(s.DirectPtr + off), len)); - } - - private static unsafe void FeedSlices(HttpSession s, Connection conn, TlsSession? tls, in RecvSnapshot snap) - { - while (conn.TryGetItem(snap, out SpscRecvRing.Item item)) - { - if (!item.HasBuffer) - { - continue; - } - if (tls != null) - { - s.Feed(tls.Decrypt(item.Ptr, item.Len)); - } - else - { - s.Feed(item.AsSpan()); + return addr.ToString(); } - conn.ReturnBuffer(in item); } + + return "127.0.0.1"; } } diff --git a/frameworks/ioxide/README.md b/frameworks/ioxide/README.md index 50eb6f877..3ee318ae4 100644 --- a/frameworks/ioxide/README.md +++ b/frameworks/ioxide/README.md @@ -1,7 +1,7 @@ # ioxide [ioxide](https://github.com/MDA2AV/ioxide) - a shared-nothing io_uring runtime for .NET - -consumed as its published NuGet packages (`ioxide`, `ioxide.pg`, `ioxide.file` 0.0.5), not +consumed as its published NuGet packages (`ioxide`, `ioxide.pg`, `ioxide.file` 0.0.8), not vendored. One ring per reactor thread: SO_REUSEPORT + multishot accept, multishot recv into a provided buffer ring, inline IValueTaskSource continuations, raw-syscall io_uring (no liburing). diff --git a/frameworks/ioxide/ioxide-arena.csproj b/frameworks/ioxide/ioxide-arena.csproj index a9de31d16..570d368e4 100644 --- a/frameworks/ioxide/ioxide-arena.csproj +++ b/frameworks/ioxide/ioxide-arena.csproj @@ -2,10 +2,11 @@ Exe - net10.0 + net11.0 enable enable true + runtime-async=on IoxideArena ioxide-arena true @@ -13,11 +14,11 @@ - - - - - + + + + +