Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,13 @@
- Rename Connection to IO

```ruby
io = machine.io(fd)
l = io.read_line(4)
io.write('foo')
```

- Add `IO#fd`/`IO#target` method

- Add `UM#inspect` (show size, modes)
- Add `UM::IO#inspect` (show target, mode, pending bytes)

## Reimplement multishot read/recv using buffer pool

- remove `#setup_buffer_ring` method
- use buffer pool, just like UM::Connection

## immediate

- Add `IO#http_xxx` methods
- `#http_read_request_headers()`
- `#http_read_body(content_length)` (-1 means chunked TE)

- Add tests for support for Set in `machine#await`
- Add tests for support for Set, Array in `machine#join`
- Add `UM#read_file` for reading entire file
- Add `UM#write_file` for writing entire file
- Rename stream methods: `:fd`, `:socket`, `:ssl`

## Balancing I/O with the runqueue

Expand All @@ -48,7 +33,7 @@
- debounce

```ruby
debouncer = machine.debounce { }
debouncer = machine.debounce { ... }
```

- happy eyeballs algo for TCP connect
Expand Down Expand Up @@ -104,8 +89,6 @@
When doing a `call`, we need to provide a mailbox for the response. can this be
automatic?

##

## Syntax / pattern for launching/supervising multiple operations

Select (see above):
Expand All @@ -125,3 +108,38 @@ machine.shift_select(*queues) #=> [result, queue]
# ['1.1.1.1:80', '2.2.2.2:80']
tcp_connect_he(*addrs)
```

## Character scanning in UM::IO

```c
// bitmaps for character types can be generated with a bit of Ruby:
//
// def t(r); (0..255).map { [it].pack('c') =~ r ? 1 : 0 }; end
// def tn(r); (0..255).map { [it].pack('c') =~ r ? 0 : 1 }; end
// def u64(bits); bits.reverse.join.to_i(2); end
// def p(a); a.each_slice(64).map { u64(it) }; end

// usage:
//
// p(t(/[a-zA-Z0-9]/)).map { format('%016X', it) }


// /[a-zA-Z0-9]/
uint64_t alpha_numeric[] = [
0x000000000000FFC0,
0x7FFFFFE07FFFFFE0,
0x0000000000000000,
0x0000000000000000
];

// HTTP method: /[a-zA-Z]/ (3-12 characters)
// header-key: /[a-zA-Z\-]/ ()
// path: /^($/

// check if character is in bitmap
inline int test_char(char c, uint64 *bitmap) {
return bitmap[c / 64] & (1UL << (c % 64));
}


```
2 changes: 0 additions & 2 deletions docs/um_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
resume value.
- `select(rfds, wfds, efds)` - selects ready fds from the given readable,
writable and exeptable fds.
- `send_bundle(fd, bgid, *strings)` - sends a bundle of buffers to the given fd
using the given buffer group id.
- `send(fd, buffer, len, flags)` - sends to the given fd from the given buffer.
- `sendv(fd, *buffers)` - sends multiple buffers to the given fd.
- `setsockopt(fd, level, opt, value)` - sets a socket option.
Expand Down
96 changes: 21 additions & 75 deletions ext/um/um.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ inline void um_teardown(struct um *machine) {
if (machine->sidecar_mode) um_sidecar_teardown(machine);
if (machine->sidecar_signal) free(machine->sidecar_signal);

for (unsigned i = 0; i < machine->buffer_ring_count; i++) {
struct buf_ring_descriptor *desc = machine->buffer_rings + i;
io_uring_free_buf_ring(&machine->ring, desc->br, desc->buf_count, i);
free(desc->buf_base);
}
machine->buffer_ring_count = 0;
io_uring_queue_exit(&machine->ring);
machine->ring_initialized = 0;

Expand Down Expand Up @@ -523,7 +517,6 @@ struct op_ctx {
struct um *machine;
struct um_op *op;
int fd;
int bgid;

struct um_queue *queue;
void *read_buf;
Expand Down Expand Up @@ -870,25 +863,6 @@ VALUE um_sendv(struct um *machine, int fd, int argc, VALUE *argv) {
return ret;
}

VALUE um_send_bundle(struct um *machine, int fd, int bgid, VALUE strings) {
um_add_strings_to_buffer_ring(machine, bgid, strings);
struct um_op *op = um_op_acquire(machine);
um_prep_op(machine, op, OP_SEND_BUNDLE, 2, 0);
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
io_uring_prep_send_bundle(sqe, fd, 0, MSG_NOSIGNAL | MSG_WAITALL);
sqe->flags |= IOSQE_BUFFER_SELECT;
sqe->buf_group = bgid;

VALUE ret = um_yield(machine);

if (likely(um_verify_op_completion(machine, op, true))) ret = INT2NUM(op->result.res);
um_op_release(machine, op);

RAISE_IF_EXCEPTION(ret);
RB_GC_GUARD(ret);
return ret;
}

VALUE um_recv(struct um *machine, int fd, VALUE buffer, size_t maxlen, int flags) {
void *ptr = um_prepare_read_buffer(buffer, maxlen, 0);
struct um_op *op = um_op_acquire(machine);
Expand Down Expand Up @@ -1461,72 +1435,44 @@ VALUE um_accept_into_queue(struct um *machine, int fd, VALUE queue) {
return rb_ensure(accept_into_queue_start, (VALUE)&ctx, multishot_complete, (VALUE)&ctx);
}

int um_read_each_singleshot_loop(struct op_ctx *ctx) {
struct buf_ring_descriptor *desc = ctx->machine->buffer_rings + ctx->bgid;
ctx->read_maxlen = desc->buf_size;
ctx->read_buf = malloc(desc->buf_size);
int total = 0;

while (1) {
um_prep_op(ctx->machine, ctx->op, OP_READ, 2, 0);
struct io_uring_sqe *sqe = um_get_sqe(ctx->machine, ctx->op);
io_uring_prep_read(sqe, ctx->fd, ctx->read_buf, ctx->read_maxlen, -1);

VALUE ret = um_yield(ctx->machine);

if (likely(um_verify_op_completion(ctx->machine, ctx->op, true))) {
VALUE buf = rb_str_new(ctx->read_buf, ctx->op->result.res);
total += ctx->op->result.res;
rb_yield(buf);
RB_GC_GUARD(buf);
}
else {
RAISE_IF_EXCEPTION(ret);
return 0;
}
RB_GC_GUARD(ret);
}
return 0;
}

// // returns true if more results are expected
int read_recv_each_multishot_process_result(struct op_ctx *ctx, struct um_op_result *result, int *total) {
inline int read_recv_each_multishot_process_result(struct op_ctx *ctx, struct um_op_result *result, int *total) {
if (result->res == 0)
return false;

*total += result->res;
VALUE buf = um_read_from_buffer_ring(ctx->machine, ctx->bgid, result->res, result->flags);
rb_yield(buf);
RB_GC_GUARD(buf);

// TTY devices might not support multishot reads:
// https://github.com/axboe/liburing/issues/1185. We detect this by checking
// if the F_MORE flag is absent, then switch to single shot mode.
if (unlikely(!(result->flags & IORING_CQE_F_MORE))) {
*total += um_read_each_singleshot_loop(ctx);
return false;
if (likely(result->segment)) {
VALUE buf = rb_str_new(result->segment->ptr, result->segment->len);
um_segment_checkin(ctx->machine, result->segment);
result->segment = NULL;
rb_yield(buf);
RB_GC_GUARD(buf);
}

return true;
}

void read_recv_each_prep(struct io_uring_sqe *sqe, struct op_ctx *ctx) {
static inline void read_recv_each_prep(struct io_uring_sqe *sqe, struct op_ctx *ctx) {
bp_ensure_commit_level(ctx->machine);
ctx->op->bp_commit_level = ctx->machine->bp_commit_level;

switch (ctx->op->kind) {
case OP_READ_MULTISHOT:
io_uring_prep_read_multishot(sqe, ctx->fd, 0, -1, ctx->bgid);
io_uring_prep_read_multishot(sqe, ctx->fd, 0, -1, BP_BGID);
return;
case OP_RECV_MULTISHOT:
io_uring_prep_recv_multishot(sqe, ctx->fd, NULL, 0, 0);
sqe->buf_group = ctx->bgid;
sqe->buf_group = BP_BGID;
sqe->flags |= IOSQE_BUFFER_SELECT;
return;
default:
return;
um_raise_internal_error("Invalid multishot op");
}
}

VALUE read_recv_each_start(VALUE arg) {
struct op_ctx *ctx = (struct op_ctx *)arg;

struct io_uring_sqe *sqe = um_get_sqe(ctx->machine, ctx->op);
read_recv_each_prep(sqe, ctx);
int total = 0;
Expand Down Expand Up @@ -1558,19 +1504,19 @@ VALUE read_recv_each_start(VALUE arg) {
return Qnil;
}

VALUE um_read_each(struct um *machine, int fd, int bgid) {
VALUE um_read_each(struct um *machine, int fd) {
struct um_op *op = um_op_acquire(machine);
um_prep_op(machine, op, OP_READ_MULTISHOT, 2, OP_F_MULTISHOT);
um_prep_op(machine, op, OP_READ_MULTISHOT, 2, OP_F_MULTISHOT | OP_F_BUFFER_POOL);

struct op_ctx ctx = { .machine = machine, .op = op, .fd = fd, .bgid = bgid, .read_buf = NULL };
struct op_ctx ctx = { .machine = machine, .op = op, .fd = fd, .read_buf = NULL };
return rb_ensure(read_recv_each_start, (VALUE)&ctx, multishot_complete, (VALUE)&ctx);
}

VALUE um_recv_each(struct um *machine, int fd, int bgid, int flags) {
VALUE um_recv_each(struct um *machine, int fd, int flags) {
struct um_op *op = um_op_acquire(machine);
um_prep_op(machine, op, OP_RECV_MULTISHOT, 2, OP_F_MULTISHOT);
um_prep_op(machine, op, OP_RECV_MULTISHOT, 2, OP_F_MULTISHOT | OP_F_BUFFER_POOL);

struct op_ctx ctx = { .machine = machine, .op = op, .fd = fd, .bgid = bgid, .read_buf = NULL, .flags = flags };
struct op_ctx ctx = { .machine = machine, .op = op, .fd = fd, .read_buf = NULL, .flags = flags };
return rb_ensure(read_recv_each_start, (VALUE)&ctx, multishot_complete, (VALUE)&ctx);
}

Expand Down
25 changes: 3 additions & 22 deletions ext/um/um.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ enum um_op_kind {
OP_RECV,
OP_RECVMSG,
OP_SEND,
OP_SEND_BUNDLE,
OP_SENDMSG,
OP_SENDV,
OP_SOCKET,
Expand Down Expand Up @@ -168,15 +167,6 @@ struct um_op {
};
};

struct buf_ring_descriptor {
struct io_uring_buf_ring *br;
size_t br_size;
unsigned buf_count;
unsigned buf_size;
unsigned buf_mask;
void *buf_base;
};

struct um_metrics {
ulong total_ops; // total ops submitted
ulong total_switches; // total fiber switches
Expand All @@ -199,8 +189,6 @@ struct um_metrics {
double time_first_cpu; // last seen time stamp
};

#define BUFFER_RING_MAX_COUNT 10

struct um {
VALUE self;

Expand All @@ -216,13 +204,9 @@ struct um {
pthread_t sidecar_thread;
uint32_t *sidecar_signal;

uint buffer_ring_count; // number of registered buffer rings

uint size; // size of SQ
uint sqpoll_mode; // SQPOLL mode enabled

struct buf_ring_descriptor buffer_rings[BUFFER_RING_MAX_COUNT];

struct um_op *transient_head; // list of pending transient ops
VALUE pending_fibers; // set containing pending fibers

Expand Down Expand Up @@ -351,9 +335,7 @@ void um_raise_on_error_result(int result);
int um_get_buffer_bytes_for_writing(VALUE buffer, const void **base, size_t *size, int raise_on_bad_buffer);
void * um_prepare_read_buffer(VALUE buffer, ssize_t len, ssize_t ofs);
void um_update_read_buffer(VALUE buffer, ssize_t buffer_offset, __s32 result);
int um_setup_buffer_ring(struct um *machine, unsigned size, unsigned count);
VALUE um_read_from_buffer_ring(struct um *machine, int bgid, __s32 result, __u32 flags);
void um_add_strings_to_buffer_ring(struct um *machine, int bgid, VALUE strings);

struct iovec *um_alloc_iovecs_for_writing(int argc, VALUE *argv, size_t *total_len);
void um_advance_iovecs_for_writing(struct iovec **ptr, int *len, size_t adv);

Expand All @@ -376,7 +358,7 @@ VALUE um_sleep(struct um *machine, double duration);
VALUE um_periodically(struct um *machine, double interval);
VALUE um_read(struct um *machine, int fd, VALUE buffer, size_t maxlen, ssize_t buffer_offset, __u64 file_offset);
size_t um_read_raw(struct um *machine, int fd, char *buffer, size_t maxlen);
VALUE um_read_each(struct um *machine, int fd, int bgid);
VALUE um_read_each(struct um *machine, int fd);
VALUE um_write(struct um *machine, int fd, VALUE buffer, size_t len, __u64 file_offset);
size_t um_write_raw(struct um *machine, int fd, const char *buffer, size_t len);
VALUE um_writev(struct um *machine, int fd, int argc, VALUE *argv);
Expand Down Expand Up @@ -405,9 +387,8 @@ VALUE um_connect(struct um *machine, int fd, const struct sockaddr *addr, sockle
VALUE um_send(struct um *machine, int fd, VALUE buffer, size_t len, int flags);
size_t um_send_raw(struct um *machine, int fd, const char *buffer, size_t len, int flags);
VALUE um_sendv(struct um *machine, int fd, int argc, VALUE *argv);
VALUE um_send_bundle(struct um *machine, int fd, int bgid, VALUE strings);
VALUE um_recv(struct um *machine, int fd, VALUE buffer, size_t maxlen, int flags);
VALUE um_recv_each(struct um *machine, int fd, int bgid, int flags);
VALUE um_recv_each(struct um *machine, int fd, int flags);
VALUE um_bind(struct um *machine, int fd, struct sockaddr *addr, socklen_t addrlen);
VALUE um_listen(struct um *machine, int fd, int backlog);
VALUE um_getsockopt(struct um *machine, int fd, int level, int opt);
Expand Down
Loading
Loading