From ff709bdcfb39ee9c78bf9f0a04cc8b3c263d4ac5 Mon Sep 17 00:00:00 2001 From: Sharon Rosner Date: Sun, 12 Apr 2026 12:57:59 +0200 Subject: [PATCH] Reimplement read_each, recv_each using buffer pool, remove #send_bundle, #setup_buffer_group --- TODO.md | 62 +++++++++++++++++++----------- docs/um_api.md | 2 - ext/um/um.c | 96 ++++++++++------------------------------------ ext/um/um.h | 25 ++---------- ext/um/um_class.c | 74 ++++++----------------------------- ext/um/um_io.c | 1 - ext/um/um_op.c | 1 - ext/um/um_utils.c | 86 ----------------------------------------- test/test_um.rb | 98 +++++------------------------------------------ 9 files changed, 85 insertions(+), 360 deletions(-) diff --git a/TODO.md b/TODO.md index 9eb8954..1fafba8 100644 --- a/TODO.md +++ b/TODO.md @@ -1,28 +1,13 @@ -- Rename Connection to IO - - ```ruby - io = machine.io(fd) - l = io.read_line(4) - io.write('foo') - ``` - -- Add `IO#fd`/`IO#target` method - -- Add `UM#inspect` (show size, modes) -- Add `UM::IO#inspect` (show target, mode, pending bytes) - -## Reimplement multishot read/recv using buffer pool - -- remove `#setup_buffer_ring` method -- use buffer pool, just like UM::Connection - ## immediate +- Add `IO#http_xxx` methods + - `#http_read_request_headers()` + - `#http_read_body(content_length)` (-1 means chunked TE) + - Add tests for support for Set in `machine#await` - Add tests for support for Set, Array in `machine#join` - Add `UM#read_file` for reading entire file - Add `UM#write_file` for writing entire file -- Rename stream methods: `:fd`, `:socket`, `:ssl` ## Balancing I/O with the runqueue @@ -48,7 +33,7 @@ - debounce ```ruby - debouncer = machine.debounce { } + debouncer = machine.debounce { ... } ``` - happy eyeballs algo for TCP connect @@ -104,8 +89,6 @@ When doing a `call`, we need to provide a mailbox for the response. can this be automatic? -## - ## Syntax / pattern for launching/supervising multiple operations Select (see above): @@ -125,3 +108,38 @@ machine.shift_select(*queues) #=> [result, queue] # ['1.1.1.1:80', '2.2.2.2:80'] tcp_connect_he(*addrs) ``` + +## Character scanning in UM::IO + +```c +// bitmaps for character types can be generated with a bit of Ruby: +// +// def t(r); (0..255).map { [it].pack('c') =~ r ? 1 : 0 }; end +// def tn(r); (0..255).map { [it].pack('c') =~ r ? 0 : 1 }; end +// def u64(bits); bits.reverse.join.to_i(2); end +// def p(a); a.each_slice(64).map { u64(it) }; end + +// usage: +// +// p(t(/[a-zA-Z0-9]/)).map { format('%016X', it) } + + +// /[a-zA-Z0-9]/ +uint64_t alpha_numeric[] = [ + 0x000000000000FFC0, + 0x7FFFFFE07FFFFFE0, + 0x0000000000000000, + 0x0000000000000000 +]; + +// HTTP method: /[a-zA-Z]/ (3-12 characters) +// header-key: /[a-zA-Z\-]/ () +// path: /^($/ + +// check if character is in bitmap +inline int test_char(char c, uint64 *bitmap) { + return bitmap[c / 64] & (1UL << (c % 64)); +} + + +``` diff --git a/docs/um_api.md b/docs/um_api.md index 5e06c37..a8d429b 100644 --- a/docs/um_api.md +++ b/docs/um_api.md @@ -56,8 +56,6 @@ resume value. - `select(rfds, wfds, efds)` - selects ready fds from the given readable, writable and exeptable fds. -- `send_bundle(fd, bgid, *strings)` - sends a bundle of buffers to the given fd - using the given buffer group id. - `send(fd, buffer, len, flags)` - sends to the given fd from the given buffer. - `sendv(fd, *buffers)` - sends multiple buffers to the given fd. - `setsockopt(fd, level, opt, value)` - sets a socket option. diff --git a/ext/um/um.c b/ext/um/um.c index 829a320..54d5576 100644 --- a/ext/um/um.c +++ b/ext/um/um.c @@ -49,12 +49,6 @@ inline void um_teardown(struct um *machine) { if (machine->sidecar_mode) um_sidecar_teardown(machine); if (machine->sidecar_signal) free(machine->sidecar_signal); - for (unsigned i = 0; i < machine->buffer_ring_count; i++) { - struct buf_ring_descriptor *desc = machine->buffer_rings + i; - io_uring_free_buf_ring(&machine->ring, desc->br, desc->buf_count, i); - free(desc->buf_base); - } - machine->buffer_ring_count = 0; io_uring_queue_exit(&machine->ring); machine->ring_initialized = 0; @@ -523,7 +517,6 @@ struct op_ctx { struct um *machine; struct um_op *op; int fd; - int bgid; struct um_queue *queue; void *read_buf; @@ -870,25 +863,6 @@ VALUE um_sendv(struct um *machine, int fd, int argc, VALUE *argv) { return ret; } -VALUE um_send_bundle(struct um *machine, int fd, int bgid, VALUE strings) { - um_add_strings_to_buffer_ring(machine, bgid, strings); - struct um_op *op = um_op_acquire(machine); - um_prep_op(machine, op, OP_SEND_BUNDLE, 2, 0); - struct io_uring_sqe *sqe = um_get_sqe(machine, op); - io_uring_prep_send_bundle(sqe, fd, 0, MSG_NOSIGNAL | MSG_WAITALL); - sqe->flags |= IOSQE_BUFFER_SELECT; - sqe->buf_group = bgid; - - VALUE ret = um_yield(machine); - - if (likely(um_verify_op_completion(machine, op, true))) ret = INT2NUM(op->result.res); - um_op_release(machine, op); - - RAISE_IF_EXCEPTION(ret); - RB_GC_GUARD(ret); - return ret; -} - VALUE um_recv(struct um *machine, int fd, VALUE buffer, size_t maxlen, int flags) { void *ptr = um_prepare_read_buffer(buffer, maxlen, 0); struct um_op *op = um_op_acquire(machine); @@ -1461,72 +1435,44 @@ VALUE um_accept_into_queue(struct um *machine, int fd, VALUE queue) { return rb_ensure(accept_into_queue_start, (VALUE)&ctx, multishot_complete, (VALUE)&ctx); } -int um_read_each_singleshot_loop(struct op_ctx *ctx) { - struct buf_ring_descriptor *desc = ctx->machine->buffer_rings + ctx->bgid; - ctx->read_maxlen = desc->buf_size; - ctx->read_buf = malloc(desc->buf_size); - int total = 0; - - while (1) { - um_prep_op(ctx->machine, ctx->op, OP_READ, 2, 0); - struct io_uring_sqe *sqe = um_get_sqe(ctx->machine, ctx->op); - io_uring_prep_read(sqe, ctx->fd, ctx->read_buf, ctx->read_maxlen, -1); - - VALUE ret = um_yield(ctx->machine); - - if (likely(um_verify_op_completion(ctx->machine, ctx->op, true))) { - VALUE buf = rb_str_new(ctx->read_buf, ctx->op->result.res); - total += ctx->op->result.res; - rb_yield(buf); - RB_GC_GUARD(buf); - } - else { - RAISE_IF_EXCEPTION(ret); - return 0; - } - RB_GC_GUARD(ret); - } - return 0; -} - // // returns true if more results are expected -int read_recv_each_multishot_process_result(struct op_ctx *ctx, struct um_op_result *result, int *total) { +inline int read_recv_each_multishot_process_result(struct op_ctx *ctx, struct um_op_result *result, int *total) { if (result->res == 0) return false; *total += result->res; - VALUE buf = um_read_from_buffer_ring(ctx->machine, ctx->bgid, result->res, result->flags); - rb_yield(buf); - RB_GC_GUARD(buf); - - // TTY devices might not support multishot reads: - // https://github.com/axboe/liburing/issues/1185. We detect this by checking - // if the F_MORE flag is absent, then switch to single shot mode. - if (unlikely(!(result->flags & IORING_CQE_F_MORE))) { - *total += um_read_each_singleshot_loop(ctx); - return false; + if (likely(result->segment)) { + VALUE buf = rb_str_new(result->segment->ptr, result->segment->len); + um_segment_checkin(ctx->machine, result->segment); + result->segment = NULL; + rb_yield(buf); + RB_GC_GUARD(buf); } return true; } -void read_recv_each_prep(struct io_uring_sqe *sqe, struct op_ctx *ctx) { +static inline void read_recv_each_prep(struct io_uring_sqe *sqe, struct op_ctx *ctx) { + bp_ensure_commit_level(ctx->machine); + ctx->op->bp_commit_level = ctx->machine->bp_commit_level; + switch (ctx->op->kind) { case OP_READ_MULTISHOT: - io_uring_prep_read_multishot(sqe, ctx->fd, 0, -1, ctx->bgid); + io_uring_prep_read_multishot(sqe, ctx->fd, 0, -1, BP_BGID); return; case OP_RECV_MULTISHOT: io_uring_prep_recv_multishot(sqe, ctx->fd, NULL, 0, 0); - sqe->buf_group = ctx->bgid; + sqe->buf_group = BP_BGID; sqe->flags |= IOSQE_BUFFER_SELECT; return; default: - return; + um_raise_internal_error("Invalid multishot op"); } } VALUE read_recv_each_start(VALUE arg) { struct op_ctx *ctx = (struct op_ctx *)arg; + struct io_uring_sqe *sqe = um_get_sqe(ctx->machine, ctx->op); read_recv_each_prep(sqe, ctx); int total = 0; @@ -1558,19 +1504,19 @@ VALUE read_recv_each_start(VALUE arg) { return Qnil; } -VALUE um_read_each(struct um *machine, int fd, int bgid) { +VALUE um_read_each(struct um *machine, int fd) { struct um_op *op = um_op_acquire(machine); - um_prep_op(machine, op, OP_READ_MULTISHOT, 2, OP_F_MULTISHOT); + um_prep_op(machine, op, OP_READ_MULTISHOT, 2, OP_F_MULTISHOT | OP_F_BUFFER_POOL); - struct op_ctx ctx = { .machine = machine, .op = op, .fd = fd, .bgid = bgid, .read_buf = NULL }; + struct op_ctx ctx = { .machine = machine, .op = op, .fd = fd, .read_buf = NULL }; return rb_ensure(read_recv_each_start, (VALUE)&ctx, multishot_complete, (VALUE)&ctx); } -VALUE um_recv_each(struct um *machine, int fd, int bgid, int flags) { +VALUE um_recv_each(struct um *machine, int fd, int flags) { struct um_op *op = um_op_acquire(machine); - um_prep_op(machine, op, OP_RECV_MULTISHOT, 2, OP_F_MULTISHOT); + um_prep_op(machine, op, OP_RECV_MULTISHOT, 2, OP_F_MULTISHOT | OP_F_BUFFER_POOL); - struct op_ctx ctx = { .machine = machine, .op = op, .fd = fd, .bgid = bgid, .read_buf = NULL, .flags = flags }; + struct op_ctx ctx = { .machine = machine, .op = op, .fd = fd, .read_buf = NULL, .flags = flags }; return rb_ensure(read_recv_each_start, (VALUE)&ctx, multishot_complete, (VALUE)&ctx); } diff --git a/ext/um/um.h b/ext/um/um.h index 3b09f64..688281f 100644 --- a/ext/um/um.h +++ b/ext/um/um.h @@ -56,7 +56,6 @@ enum um_op_kind { OP_RECV, OP_RECVMSG, OP_SEND, - OP_SEND_BUNDLE, OP_SENDMSG, OP_SENDV, OP_SOCKET, @@ -168,15 +167,6 @@ struct um_op { }; }; -struct buf_ring_descriptor { - struct io_uring_buf_ring *br; - size_t br_size; - unsigned buf_count; - unsigned buf_size; - unsigned buf_mask; - void *buf_base; -}; - struct um_metrics { ulong total_ops; // total ops submitted ulong total_switches; // total fiber switches @@ -199,8 +189,6 @@ struct um_metrics { double time_first_cpu; // last seen time stamp }; -#define BUFFER_RING_MAX_COUNT 10 - struct um { VALUE self; @@ -216,13 +204,9 @@ struct um { pthread_t sidecar_thread; uint32_t *sidecar_signal; - uint buffer_ring_count; // number of registered buffer rings - uint size; // size of SQ uint sqpoll_mode; // SQPOLL mode enabled - struct buf_ring_descriptor buffer_rings[BUFFER_RING_MAX_COUNT]; - struct um_op *transient_head; // list of pending transient ops VALUE pending_fibers; // set containing pending fibers @@ -351,9 +335,7 @@ void um_raise_on_error_result(int result); int um_get_buffer_bytes_for_writing(VALUE buffer, const void **base, size_t *size, int raise_on_bad_buffer); void * um_prepare_read_buffer(VALUE buffer, ssize_t len, ssize_t ofs); void um_update_read_buffer(VALUE buffer, ssize_t buffer_offset, __s32 result); -int um_setup_buffer_ring(struct um *machine, unsigned size, unsigned count); -VALUE um_read_from_buffer_ring(struct um *machine, int bgid, __s32 result, __u32 flags); -void um_add_strings_to_buffer_ring(struct um *machine, int bgid, VALUE strings); + struct iovec *um_alloc_iovecs_for_writing(int argc, VALUE *argv, size_t *total_len); void um_advance_iovecs_for_writing(struct iovec **ptr, int *len, size_t adv); @@ -376,7 +358,7 @@ VALUE um_sleep(struct um *machine, double duration); VALUE um_periodically(struct um *machine, double interval); VALUE um_read(struct um *machine, int fd, VALUE buffer, size_t maxlen, ssize_t buffer_offset, __u64 file_offset); size_t um_read_raw(struct um *machine, int fd, char *buffer, size_t maxlen); -VALUE um_read_each(struct um *machine, int fd, int bgid); +VALUE um_read_each(struct um *machine, int fd); VALUE um_write(struct um *machine, int fd, VALUE buffer, size_t len, __u64 file_offset); size_t um_write_raw(struct um *machine, int fd, const char *buffer, size_t len); VALUE um_writev(struct um *machine, int fd, int argc, VALUE *argv); @@ -405,9 +387,8 @@ VALUE um_connect(struct um *machine, int fd, const struct sockaddr *addr, sockle VALUE um_send(struct um *machine, int fd, VALUE buffer, size_t len, int flags); size_t um_send_raw(struct um *machine, int fd, const char *buffer, size_t len, int flags); VALUE um_sendv(struct um *machine, int fd, int argc, VALUE *argv); -VALUE um_send_bundle(struct um *machine, int fd, int bgid, VALUE strings); VALUE um_recv(struct um *machine, int fd, VALUE buffer, size_t maxlen, int flags); -VALUE um_recv_each(struct um *machine, int fd, int bgid, int flags); +VALUE um_recv_each(struct um *machine, int fd, int flags); VALUE um_bind(struct um *machine, int fd, struct sockaddr *addr, socklen_t addrlen); VALUE um_listen(struct um *machine, int fd, int backlog); VALUE um_getsockopt(struct um *machine, int fd, int level, int opt); diff --git a/ext/um/um_class.c b/ext/um/um_class.c index dfad647..5743890 100644 --- a/ext/um/um_class.c +++ b/ext/um/um_class.c @@ -129,18 +129,6 @@ VALUE UM_initialize(int argc, VALUE *argv, VALUE self) { return self; } -/* Creates a buffer group (buffer ring) with the given buffer size and buffer count. - * - * @param size [Integer] buffer size in bytes - * @param count [Integer] number of buffers in group - * @return [Integer] buffer group id - */ -VALUE UM_setup_buffer_ring(VALUE self, VALUE size, VALUE count) { - struct um *machine = um_get_machine(self); - int bgid = um_setup_buffer_ring(machine, NUM2UINT(size), NUM2UINT(count)); - return INT2NUM(bgid); -} - /* Returns the SQ (submission queue) size. * * @return [Integer] SQ size @@ -408,21 +396,19 @@ VALUE UM_read(int argc, VALUE *argv, VALUE self) { } /* call-seq: - * machine.read_each(fd, bgid) { |data| } + * machine.read_each(fd) { |data| } * - * Reads repeatedly from the given `fd` using the given buffer group id. The - * buffer group should have been previously setup using `#setup_buffer_ring`. - * Read data is yielded in an infinite loop to the given block. + * Reads repeatedly from the given fd. Read data is yielded in an infinite + * loop to the given block. * * - https://www.man7.org/linux/man-pages/man3/io_uring_prep_read_multishot.3.html * * @param fd [Integer] file descriptor - * @param bgid [Integer] buffer group id * @return [void] */ -VALUE UM_read_each(VALUE self, VALUE fd, VALUE bgid) { +VALUE UM_read_each(VALUE self, VALUE fd) { struct um *machine = um_get_machine(self); - return um_read_each(machine, NUM2INT(fd), NUM2INT(bgid)); + return um_read_each(machine, NUM2INT(fd)); } /* call-seq: @@ -836,38 +822,6 @@ VALUE UM_sendv(int argc, VALUE *argv, VALUE self) { } -/* call-seq: - * machine.send_bundle(fd, bgid, *buffers) -> bytes_sent - * - * Sends data on the given socket from the given buffers using a registered - * buffer group. The buffer group should have been previously registered using - * `#setup_buffer_ring`. - * - * - https://www.man7.org/linux/man-pages/man2/send.2.html - * - https://www.man7.org/linux/man-pages/man3/io_uring_prep_send.3.html - * - * @overload send_bundle(fd, bgid, *buffers) - * @param fd [Integer] file descriptor - * @param bgid [Integer] buffer group id - * @param *buffers [Array] buffers - * @return [Integer] number of bytes sent - */ -VALUE UM_send_bundle(int argc, VALUE *argv, VALUE self) { - struct um *machine = um_get_machine(self); - VALUE fd; - VALUE bgid; - VALUE strings; - rb_scan_args(argc, argv, "2*", &fd, &bgid, &strings); - - if (RARRAY_LEN(strings) == 1) { - VALUE first = rb_ary_entry(strings, 0); - if (TYPE(first) == T_ARRAY) - strings = first; - } - - return um_send_bundle(machine, NUM2INT(fd), NUM2INT(bgid), strings); -} - /* call-seq: * machine.recv(fd, buffer, maxlen, flags) -> bytes_received * @@ -888,23 +842,20 @@ VALUE UM_recv(VALUE self, VALUE fd, VALUE buffer, VALUE maxlen, VALUE flags) { } /* call-seq: - * machine.recv_each(fd, bgid, flags) { |data| ... } + * machine.recv_each(fd, flags) { |data| ... } * - * Repeatedlty receives data from the given socket in an infinite loop using the - * given buffer group id. The buffer group should have been previously setup - * using `#setup_buffer_ring`. + * Repeatedlty receives data from the given socket in an infinite loop. * * - https://www.man7.org/linux/man-pages/man2/recv.2.html * - https://www.man7.org/linux/man-pages/man3/io_uring_prep_recv.3.html * * @param fd [Integer] file descriptor - * @param bgid [Integer] buffer group id * @param flags [Integer] flags mask * @return [void] */ -VALUE UM_recv_each(VALUE self, VALUE fd, VALUE bgid, VALUE flags) { +VALUE UM_recv_each(VALUE self, VALUE fd, VALUE flags) { struct um *machine = um_get_machine(self); - return um_recv_each(machine, NUM2INT(fd), NUM2INT(bgid), NUM2INT(flags)); + return um_recv_each(machine, NUM2INT(fd), NUM2INT(flags)); } /* call-seq: @@ -1512,8 +1463,6 @@ void Init_UM(void) { rb_define_method(cUM, "sidecar_start", UM_sidecar_start, 0); rb_define_method(cUM, "sidecar_stop", UM_sidecar_stop, 0); - rb_define_method(cUM, "setup_buffer_ring", UM_setup_buffer_ring, 2); - rb_define_method(cUM, "schedule", UM_schedule, 2); rb_define_method(cUM, "snooze", UM_snooze, 0); rb_define_method(cUM, "timeout", UM_timeout, 2); @@ -1527,7 +1476,7 @@ void Init_UM(void) { rb_define_method(cUM, "close_async", UM_close_async, 1); rb_define_method(cUM, "open", UM_open, 2); rb_define_method(cUM, "read", UM_read, -1); - rb_define_method(cUM, "read_each", UM_read_each, 2); + rb_define_method(cUM, "read_each", UM_read_each, 1); rb_define_method(cUM, "sleep", UM_sleep, 1); rb_define_method(cUM, "periodically", UM_periodically, 1); rb_define_method(cUM, "write", UM_write, -1); @@ -1554,11 +1503,10 @@ void Init_UM(void) { rb_define_method(cUM, "getsockopt", UM_getsockopt, 3); rb_define_method(cUM, "listen", UM_listen, 2); rb_define_method(cUM, "recv", UM_recv, 4); - rb_define_method(cUM, "recv_each", UM_recv_each, 3); + rb_define_method(cUM, "recv_each", UM_recv_each, 2); rb_define_method(cUM, "send", UM_send, 4); rb_define_method(cUM, "sendv", UM_sendv, -1); - rb_define_method(cUM, "send_bundle", UM_send_bundle, -1); rb_define_method(cUM, "setsockopt", UM_setsockopt, 4); rb_define_method(cUM, "socket", UM_socket, 4); rb_define_method(cUM, "shutdown", UM_shutdown, 2); diff --git a/ext/um/um_io.c b/ext/um/um_io.c index c7271e8..2ba21dc 100644 --- a/ext/um/um_io.c +++ b/ext/um/um_io.c @@ -177,7 +177,6 @@ int io_get_more_segments_bp(struct um_io *io) { um_op_release(io->machine, io->op); io->op = NULL; - // io_multishot_op_start(io); } else { um_op_release(io->machine, io->op); diff --git a/ext/um/um_op.c b/ext/um/um_op.c index fe60b51..2dbe2fa 100644 --- a/ext/um/um_op.c +++ b/ext/um/um_op.c @@ -20,7 +20,6 @@ const char * um_op_kind_name(enum um_op_kind kind) { case OP_RECV: return "OP_RECV"; case OP_RECVMSG: return "OP_RECVMSG"; case OP_SEND: return "OP_SEND"; - case OP_SEND_BUNDLE: return "OP_SEND_BUNDLE"; case OP_SENDMSG: return "OP_SENDMSG"; case OP_SENDV: return "OP_SENDV"; case OP_SOCKET: return "OP_SOCKET"; diff --git a/ext/um/um_utils.c b/ext/um/um_utils.c index cd86b61..c4f2cfd 100644 --- a/ext/um/um_utils.c +++ b/ext/um/um_utils.c @@ -115,92 +115,6 @@ inline int um_get_buffer_bytes_for_writing(VALUE buffer, const void **base, size return true; } -int um_setup_buffer_ring(struct um *machine, unsigned size, unsigned count) { - if (machine->buffer_ring_count == BUFFER_RING_MAX_COUNT) - um_raise_internal_error("Cannot setup more than BUFFER_RING_MAX_COUNT buffer rings"); - - struct buf_ring_descriptor *desc = machine->buffer_rings + machine->buffer_ring_count; - desc->buf_count = count; - desc->buf_size = size; - desc->br_size = sizeof(struct io_uring_buf) * desc->buf_count; - desc->buf_mask = io_uring_buf_ring_mask(desc->buf_count); - - void *mapped = mmap( - NULL, desc->br_size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, 0, 0 - ); - if (mapped == MAP_FAILED) - um_raise_internal_error("Failed to allocate buffer ring"); - - desc->br = (struct io_uring_buf_ring *)mapped; - io_uring_buf_ring_init(desc->br); - - unsigned bg_id = machine->buffer_ring_count; - int ret; - desc->br = io_uring_setup_buf_ring(&machine->ring, count, bg_id, 0, &ret); - if (!desc->br) { - munmap(desc->br, desc->br_size); - rb_syserr_fail(-ret, strerror(-ret)); - } - - if (size > 0) { - if (posix_memalign(&desc->buf_base, 4096, desc->buf_count * desc->buf_size)) { - io_uring_free_buf_ring(&machine->ring, desc->br, desc->buf_count, bg_id); - um_raise_internal_error("Failed to allocate buffers"); - } - - char *ptr = desc->buf_base; - for (unsigned i = 0; i < desc->buf_count; i++) { - io_uring_buf_ring_add(desc->br, ptr, desc->buf_size, i, desc->buf_mask, i); - ptr += desc->buf_size; - } - io_uring_buf_ring_advance(desc->br, desc->buf_count); - } - machine->buffer_ring_count++; - return bg_id; -} - -inline VALUE um_read_from_buffer_ring(struct um *machine, int bgid, __s32 result, __u32 flags) { - if (!result) return Qnil; - - unsigned buf_idx = flags >> IORING_CQE_BUFFER_SHIFT; - struct buf_ring_descriptor *desc = machine->buffer_rings + bgid; - char *src = (char *)desc->buf_base + desc->buf_size * buf_idx; - // TODO: add support for UTF8 - // buf = rd->utf8_encoding ? rb_utf8_str_new(src, cqe->res) : rb_str_new(src, cqe->res); - VALUE buf = rb_str_new(src, result); - - // add buffer back to buffer ring - io_uring_buf_ring_add( - desc->br, src, desc->buf_size, buf_idx, desc->buf_mask, 0 - ); - io_uring_buf_ring_advance(desc->br, 1); - - RB_GC_GUARD(buf); - return buf; -} - -inline void um_add_strings_to_buffer_ring(struct um *machine, int bgid, VALUE strings) { - static ID ID_to_s = 0; - - struct buf_ring_descriptor *desc = machine->buffer_rings + bgid; - ulong count = RARRAY_LEN(strings); - VALUE str = Qnil; - VALUE converted = Qnil; - - for (ulong i = 0; i < count; i++) { - str = rb_ary_entry(strings, i); - if (TYPE(str) != T_STRING) { - if (!ID_to_s) ID_to_s = rb_intern("to_s"); - if (NIL_P(converted)) converted = rb_ary_new(); - str = rb_funcall(str, ID_to_s, 0); - rb_ary_push(converted, str); - } - io_uring_buf_ring_add(desc->br, RSTRING_PTR(str), RSTRING_LEN(str), i, desc->buf_mask, i); - } - RB_GC_GUARD(converted); - io_uring_buf_ring_advance(desc->br, count); -} - inline void um_raise_internal_error(const char *msg) { rb_raise(eUMError, "UringMachine error: %s", msg); } diff --git a/test/test_um.rb b/test/test_um.rb index b937bc6..6aa1eab 100644 --- a/test/test_um.rb +++ b/test/test_um.rb @@ -653,8 +653,6 @@ class ReadEachTest < UMBaseTest def test_read_each r, w = IO.pipe bufs = [] - bgid = machine.setup_buffer_ring(4096, 1024) - assert_equal 0, bgid f = Fiber.new do w << 'foo' @@ -669,7 +667,7 @@ def test_read_each machine.schedule(f, nil) - machine.read_each(r.fileno, bgid) do |buf| + machine.read_each(r.fileno) do |buf| bufs << buf end @@ -681,15 +679,13 @@ def test_read_each # send once and close write fd def test_read_each_raising_1 r, w = IO.pipe - bgid = machine.setup_buffer_ring(4096, 1024) - assert_equal 0, bgid w << 'foo' w.close e = nil begin - machine.read_each(r.fileno, bgid) do |buf| + machine.read_each(r.fileno) do |buf| raise 'hi' end rescue => e @@ -704,14 +700,12 @@ def test_read_each_raising_1 # send once and leave write fd open def test_read_each_raising_2 r, w = IO.pipe - bgid = machine.setup_buffer_ring(4096, 1024) - assert_equal 0, bgid w << 'foo' e = nil begin - machine.read_each(r.fileno, bgid) do |buf| + machine.read_each(r.fileno) do |buf| raise 'hi' end rescue => e @@ -728,15 +722,13 @@ def test_read_each_raising_2 # send twice def test_read_each_raising_3 r, w = IO.pipe - bgid = machine.setup_buffer_ring(4096, 1024) - assert_equal 0, bgid w << 'foo' w << 'bar' e = nil begin - machine.read_each(r.fileno, bgid) do |buf| + machine.read_each(r.fileno) do |buf| raise 'hi' end rescue => e @@ -752,7 +744,6 @@ def test_read_each_raising_3 def test_read_each_break r, w = IO.pipe - bgid = machine.setup_buffer_ring(4096, 1024) t = Thread.new do sleep 0.1 @@ -762,7 +753,7 @@ def test_read_each_break end bufs = [] - machine.read_each(r.fileno, bgid) do |b| + machine.read_each(r.fileno) do |b| bufs << b break end @@ -779,13 +770,12 @@ class TOError < StandardError; end def test_read_each_timeout r, _w = IO.pipe - bgid = machine.setup_buffer_ring(4096, 1024) bufs = [] e = nil begin machine.timeout(0.01, TOError) do - machine.read_each(r.fileno, bgid) do |b| + machine.read_each(r.fileno) do |b| bufs << b end end @@ -804,10 +794,9 @@ def test_read_each_close def test_read_each_bad_file _r, w = IO.pipe - bgid = machine.setup_buffer_ring(4096, 1024) assert_raises(Errno::EBADF) do - machine.read_each(w.fileno, bgid) + machine.read_each(w.fileno) end assert_equal 0, machine.metrics[:ops_pending] assert_equal 256, machine.metrics[:ops_free] @@ -1892,15 +1881,9 @@ def test_recv_each res = machine.connect(fd, '127.0.0.1', @port) assert_equal 0, res - bgid = machine.setup_buffer_ring(4096, 1024) - assert_equal 0, bgid - - bgid2 = machine.setup_buffer_ring(4096, 1024) - assert_equal 1, bgid2 - bufs = [] - machine.recv_each(fd, bgid, 0) do |buf| + machine.recv_each(fd, 0) do |buf| bufs << buf end assert_equal ['abc', 'def', 'ghi'], bufs @@ -1922,14 +1905,11 @@ def test_recv_each_timeout res = machine.connect(fd, '127.0.0.1', @port) assert_equal 0, res - bgid = machine.setup_buffer_ring(4096, 1024) - assert_equal 0, bgid - bufs = [] e = nil begin machine.timeout(0.01, TOError) do - machine.recv_each(fd, bgid, 0) do |buf| + machine.recv_each(fd, 0) do |buf| bufs << buf end end @@ -1953,9 +1933,6 @@ def test_recv_each_shutdown res = machine.connect(fd, '127.0.0.1', @port) assert_equal 0, res - bgid = machine.setup_buffer_ring(4096, 1024) - assert_equal 0, bgid - bufs = [] e = nil @@ -1965,7 +1942,7 @@ def test_recv_each_shutdown } begin - machine.recv_each(fd, bgid, 0) do |buf| + machine.recv_each(fd, 0) do |buf| bufs << buf end rescue => e @@ -3097,61 +3074,6 @@ def test_fork end end -class SendBundleTest < UMBaseTest - def setup - super - @client_fd, @server_fd = UM.socketpair(UM::AF_UNIX, UM::SOCK_STREAM, 0) - end - - def test_send_bundle_splat - bgid = machine.setup_buffer_ring(0, 8) - assert_equal 0, bgid - - strs = ['foo', 'bar', 'bazzzzz'] - len = strs.inject(0) { |len, s| len + s.bytesize } - - ret = machine.send_bundle(@client_fd, bgid, *strs) - assert_equal len, ret - - buf = +'' - ret = machine.recv(@server_fd, buf, 8192, 0) - assert_equal len, ret - assert_equal strs.join, buf - end - - def test_send_bundle_array - bgid = machine.setup_buffer_ring(0, 8) - assert_equal 0, bgid - - strs = ['foo', 'bar', 'bazzzzz'] - len = strs.inject(0) { |len, s| len + s.bytesize } - - ret = machine.send_bundle(@client_fd, bgid, strs) - assert_equal len, ret - - buf = +'' - ret = machine.recv(@server_fd, buf, 8192, 0) - assert_equal len, ret - assert_equal strs.join, buf - end - - def test_send_bundle_non_strings - bgid = machine.setup_buffer_ring(0, 8) - assert_equal 0, bgid - - strs = [42, 'bar', false] - len = strs.inject(0) { |len, s| len + s.to_s.bytesize } - - ret = machine.send_bundle(@client_fd, bgid, strs) - assert_equal len, ret - - buf = +'' - ret = machine.recv(@server_fd, buf, 8192, 0) - assert_equal len, ret - assert_equal strs.map(&:to_s).join, buf - end -end - class MetricsTest < UMBaseTest def test_metrics_empty assert_equal({