Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions frameworks/ioxide/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM mcr.microsoft.com/dotnet/sdk:10.0 AS build
FROM mcr.microsoft.com/dotnet/sdk:11.0-preview AS build
WORKDIR /source
COPY ioxide-arena.csproj ./
RUN dotnet restore
Expand All @@ -8,7 +8,7 @@ RUN dotnet publish -c Release --no-self-contained -o /app/out
# ioxide drives io_uring through direct libc syscalls (no liburing). The bench
# harness runs containers with --security-opt seccomp=unconfined (required for
# io_uring_setup/enter); engine="io_uring" makes validate.sh enable it too.
FROM mcr.microsoft.com/dotnet/runtime:10.0
FROM mcr.microsoft.com/dotnet/runtime:11.0-preview
WORKDIR /app
COPY --from=build /app/out ./
EXPOSE 8080 8081
Expand Down
224 changes: 224 additions & 0 deletions frameworks/ioxide/Handler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
using ioxide;
using ioxide.file;
using ioxide.pg;
using ioxide.tls;
using ioxide.utils;

namespace IoxideArena;

internal static class Handler
{
private static int _slab = 16 * 1024;
private static Dataset _dataSet = Dataset.Empty;
private static StaticAssets? _staticAssets;
private static Precompressed? _precompressed;
private static bool _hasPg;
private static bool _hasTls;
private static bool _hasCache;

public static void Init(ServerConfig config, Dataset ds, StaticAssets? assets, Precompressed? precompressed, bool hasPg, bool hasTls, bool hasCache)
{
_slab = config.WriteSlabSize;
_dataSet = ds;
_staticAssets = assets;
_precompressed = precompressed;
_hasPg = hasPg;
_hasTls = hasTls;
_hasCache = hasCache;
}

public static async Task HandleAsync(Reactor reactor, Connection conn)
{
var httpSession = new HttpSession(_dataSet, _staticAssets, _precompressed);
PgPool? pool = _hasPg ? reactor.GetService<PgPool>() : null;
ICrudCache? cache = _hasCache ? reactor.GetService<ICrudCache>() : null;
PgRowHandler rowSink = httpSession.AppendDbRow; // async-db rows
PgRowHandler listSink = httpSession.AppendCrudRow; // crud list rows
PgRowHandler itemSink = httpSession.CaptureCrudItem; // crud single item
TlsSession? tls = null;

try
{
if (_hasTls && conn.ListenerPort == 8081)
{
// Handshake over the ring, then kTLS TX: outbound writes below are
// plaintext and the kernel produces the records. Inbound stays
// userspace: each slice decrypts through the session. The client's
// first request can ride in with its Finished, so feed it here -
// the send-first loop below answers it before blocking on a read.
tls = await reactor.GetService<TlsService>().AcceptAsync(conn);
httpSession.Feed(tls.DrainPlaintext());
}

// Send-first: respond to whatever is already parsed (a request bundled
// with the TLS handshake, or a prior read) before parking on the next
// read. A read-first loop would deadlock on the bundled-request case.
while (true)
{
// /async-db parks the parser: run the query (inline on this reactor's
// ring via ioxide.pg), stream rows into Out, then resume the carry -
// pipelined requests behind it are served in order.
while (httpSession.PendingDb)
{
httpSession.PendingDb = false;
if (pool != null)
{
httpSession.BeginDbResponse();
await pool.QueryRowsAsync(httpSession.PendingDbSql(), rowSink);
httpSession.EndDbResponse();
}
else
{
httpSession.WriteDbUnavailable();
}

if (httpSession.PendingDbClose) httpSession.WantClose = true;
else httpSession.ResumeFeed();
}

while (httpSession.PendingCrud != CrudKind.None)
{
CrudKind kind = httpSession.PendingCrud;
httpSession.PendingCrud = CrudKind.None;

if (pool == null)
{
httpSession.WriteCrudUnavailable();
}
else switch (kind)
{
case CrudKind.List:
httpSession.BeginCrudList();
await httpSession.SubmitCrudList(pool, listSink);
httpSession.EndCrudList();
break;

case CrudKind.GetOne:
string key = httpSession.CacheKey();
string? cached = cache != null ? await cache.GetAsync(key) : null;
if (cached != null)
{
httpSession.WriteCrudItemResponse(System.Text.Encoding.UTF8.GetBytes(cached), cacheHit: true);
}
else
{
httpSession.ResetCrudItem();
await httpSession.SubmitCrudItem(pool, itemSink);
if (httpSession.CrudItemFound)
{
if (cache != null)
await cache.SetExAsync(key, System.Text.Encoding.UTF8.GetString(httpSession.CrudItemBody()), 1);
httpSession.WriteCrudItemResponse(httpSession.CrudItemBody(), cacheHit: false);
}
else
{
httpSession.WriteCrud404();
}
}
break;

case CrudKind.Create:
await httpSession.SubmitCrudInsert(pool);
httpSession.WriteCrudStatus("HTTP/1.1 201 Created\r\nContent-Length: 0\r\n"u8);
break;

case CrudKind.Update:
await httpSession.SubmitCrudUpdate(pool);
if (cache != null) await cache.DelAsync(httpSession.CacheKey());
httpSession.WriteCrudStatus("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n"u8);
break;
}

if (httpSession.PendingCrudClose) httpSession.WantClose = true;
else httpSession.ResumeFeed();
}

// Baked static responses go straight to the wire (not through Out) - no extra copy,
// and Out never grows to the largest asset, so per-connection memory stays flat
// under load. Sent before Out, which preserves order (Direct is only set when it was
// the first response of the batch).
if (httpSession.HasDirect)
{
int dsent = 0;
while (dsent < httpSession.DirectLen)
{
int dchunk = Math.Min(httpSession.DirectLen - dsent, _slab);
WriteDirect(conn, httpSession, dsent, dchunk);
await conn.FlushAsync();
dsent += dchunk;
}
httpSession.ClearDirect();
}

int sent = 0;
while (sent < httpSession.OutLen)
{
int chunk = Math.Min(httpSession.OutLen - sent, _slab);
conn.Write(httpSession.Out.AsSpan(sent, chunk));
await conn.FlushAsync();
sent += chunk;
}
httpSession.OutLen = 0;

if (httpSession.WantClose || (tls?.Closed ?? false))
return;

RecvSnapshot snap = await conn.ReadAsync();
FeedSlices(httpSession, conn, tls, snap);
if (snap.IsClosed)
{
httpSession.WantClose = true;
}
else
{
conn.ResetRead();
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine($"[r{reactor.Id}] http handler crash fd={conn.ClientFd}: {ex}");
}
finally
{
tls?.Dispose();
conn.DecRef();
}
}

// Copy one slab-sized slice of the direct (baked static) response into the connection's write
// slab - managed precompressed buffer or native identity response. Kept in a sync unsafe helper
// so the native pointer never crosses an await in the async handler.
private static unsafe void WriteDirect(Connection conn, HttpSession s, int off, int len)
{
if (s.DirectBytes != null)
{
conn.Write(s.DirectBytes.AsSpan(off, len));
}
else
{
conn.Write(new ReadOnlySpan<byte>((void*)(s.DirectPtr + off), len));
}
}

private static unsafe void FeedSlices(HttpSession s, Connection conn, TlsSession? tls, in RecvSnapshot snap)
{
while (conn.TryGetItem(snap, out SpscRecvRing.Item item))
{
if (!item.HasBuffer)
{
continue;
}
if (tls != null)
{
s.Feed(tls.Decrypt(item.Ptr, item.Len));
}
else
{
s.Feed(item.AsSpan());
}

conn.ReturnBuffer(in item);
}
}
}
26 changes: 21 additions & 5 deletions frameworks/ioxide/HttpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,16 @@ public void Feed(ReadOnlySpan<byte> data)
{
int take = (int)Math.Min(PendingUploadRemaining, (long)data.Length);
PendingUploadRemaining -= take;
if (PendingUploadRemaining > 0) return; // more body still to come; nothing buffered
if (PendingUploadRemaining > 0)
{
return; // more body still to come; nothing buffered
}
FinishUpload(); // last byte counted - write the byte-count response
data = data[take..]; // any remainder is the start of the next request
if (data.IsEmpty) return;
if (data.IsEmpty)
{
return;
}
}
AppendCarry(data);
Pump();
Expand All @@ -81,7 +87,11 @@ private void FinishUpload()
Span<byte> num = stackalloc byte[20];
Utf8Formatter.TryFormat(_uploadTotal, num, out int n);
WriteResp(num[..n], _uploadClose);
if (_uploadClose) WantClose = true;

if (_uploadClose)
{
WantClose = true;
}
}

private void Pump()
Expand All @@ -91,12 +101,18 @@ private void Pump()
&& TryOne(_carry.AsSpan(pos, _carryLen - pos), out int consumed, out bool close))
{
pos += consumed;
if (close && !PendingDb) { WantClose = true; break; }
if (close && !PendingDb)
{
WantClose = true; break;
}
}
if (pos > 0)
{
int rem = _carryLen - pos;
if (rem > 0) Array.Copy(_carry, pos, _carry, 0, rem);
if (rem > 0)
{
Array.Copy(_carry, pos, _carry, 0, rem);
}
_carryLen = rem;
}
}
Expand Down
Loading