Skip to content

Commit 398eef8

Browse files
authored
fs: support io_uring with tokio::fs::read (#7696)
1 parent c8116ec commit 398eef8

File tree

7 files changed

+435
-3
lines changed

7 files changed

+435
-3
lines changed

tokio/src/fs/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,6 @@ pub use self::metadata::metadata;
237237

238238
mod open_options;
239239
pub use self::open_options::OpenOptions;
240-
cfg_io_uring! {
241-
pub(crate) use self::open_options::UringOpenOptions;
242-
}
243240

244241
mod read;
245242
pub use self::read::read;
@@ -298,6 +295,13 @@ cfg_windows! {
298295
pub use self::symlink_file::symlink_file;
299296
}
300297

298+
cfg_io_uring! {
299+
pub(crate) mod read_uring;
300+
pub(crate) use self::read_uring::read_uring;
301+
302+
pub(crate) use self::open_options::UringOpenOptions;
303+
}
304+
301305
use std::io;
302306

303307
#[cfg(not(test))]

tokio/src/fs/read.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ use std::{io, path::Path};
3030
///
3131
/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
3232
///
33+
/// # io_uring support
34+
///
35+
/// On Linux, you can also use io_uring for executing system calls. To enable
36+
/// io_uring, you need to specify the `--cfg tokio_unstable` flag at compile time,
37+
/// enable the io-uring cargo feature, and set the `Builder::enable_io_uring`
38+
/// runtime option.
39+
///
40+
/// Support for io_uring is currently experimental, so its behavior may change
41+
/// or it may be removed in future versions.
42+
///
3343
/// # Examples
3444
///
3545
/// ```no_run
@@ -45,5 +55,23 @@ use std::{io, path::Path};
4555
/// ```
4656
pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
4757
let path = path.as_ref().to_owned();
58+
59+
#[cfg(all(
60+
tokio_unstable,
61+
feature = "io-uring",
62+
feature = "rt",
63+
feature = "fs",
64+
target_os = "linux"
65+
))]
66+
{
67+
use crate::fs::read_uring;
68+
69+
let handle = crate::runtime::Handle::current();
70+
let driver_handle = handle.inner.driver().io();
71+
if driver_handle.check_and_init()? {
72+
return read_uring(&path).await;
73+
}
74+
}
75+
4876
asyncify(move || std::fs::read(path)).await
4977
}

tokio/src/fs/read_uring.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
use crate::fs::OpenOptions;
2+
use crate::runtime::driver::op::Op;
3+
4+
use std::io;
5+
use std::io::ErrorKind;
6+
use std::os::fd::OwnedFd;
7+
use std::path::Path;
8+
9+
// this algorithm is inspired from rust std lib version 1.90.0
10+
// https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409
11+
const PROBE_SIZE: usize = 32;
12+
const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;
13+
14+
// Max bytes we can read using io uring submission at a time
15+
// SAFETY: cannot be higher than u32::MAX for safe cast
16+
// Set to read max 64 MiB at time
17+
const MAX_READ_SIZE: usize = 64 * 1024 * 1024;
18+
19+
pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
20+
let file = OpenOptions::new().read(true).open(path).await?;
21+
22+
// TODO: use io uring in the future to obtain metadata
23+
let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok();
24+
25+
let fd: OwnedFd = file
26+
.try_into_std()
27+
.expect("unexpected in-flight operation detected")
28+
.into();
29+
30+
let mut buf = Vec::new();
31+
32+
if let Some(size_hint) = size_hint {
33+
buf.try_reserve(size_hint)?;
34+
}
35+
36+
read_to_end_uring(fd, buf).await
37+
}
38+
39+
async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> {
40+
let mut offset = 0;
41+
let start_cap = buf.capacity();
42+
43+
loop {
44+
if buf.len() == buf.capacity() && buf.capacity() == start_cap && buf.len() >= PROBE_SIZE {
45+
// The buffer might be an exact fit. Let's read into a probe buffer
46+
// and see if it returns `Ok(0)`. If so, we've avoided an
47+
// unnecessary increasing of the capacity. But if not, append the
48+
// probe buffer to the primary buffer and let its capacity grow.
49+
let (r_fd, r_buf, is_eof) = small_probe_read(fd, buf, &mut offset).await?;
50+
51+
if is_eof {
52+
return Ok(r_buf);
53+
}
54+
55+
buf = r_buf;
56+
fd = r_fd;
57+
}
58+
59+
// buf is full, need more capacity
60+
if buf.len() == buf.capacity() {
61+
buf.try_reserve(PROBE_SIZE)?;
62+
}
63+
64+
// prepare the spare capacity to be read into
65+
let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);
66+
67+
// buf_len cannot be greater than u32::MAX because MAX_READ_SIZE
68+
// is less than u32::MAX
69+
let read_len = u32::try_from(buf_len).expect("buf_len must always fit in u32");
70+
71+
// read into spare capacity
72+
let (r_fd, r_buf, is_eof) = op_read(fd, buf, &mut offset, read_len).await?;
73+
74+
if is_eof {
75+
return Ok(r_buf);
76+
}
77+
78+
fd = r_fd;
79+
buf = r_buf;
80+
}
81+
}
82+
83+
async fn small_probe_read(
84+
fd: OwnedFd,
85+
mut buf: Vec<u8>,
86+
offset: &mut u64,
87+
) -> io::Result<(OwnedFd, Vec<u8>, bool)> {
88+
let read_len = PROBE_SIZE_U32;
89+
90+
let mut temp_arr = [0; PROBE_SIZE];
91+
// we don't call this function if buffer's length < PROBE_SIZE
92+
let back_bytes_len = buf.len() - PROBE_SIZE;
93+
94+
temp_arr.copy_from_slice(&buf[back_bytes_len..]);
95+
96+
// We're decreasing the length of the buffer and len is greater
97+
// than PROBE_SIZE. So we can read into the discarded length
98+
buf.truncate(back_bytes_len);
99+
100+
let (r_fd, mut r_buf, is_eof) = op_read(fd, buf, offset, read_len).await?;
101+
// If `size_read` returns zero due to reasons such as buffer's exact fit,
102+
// then this `try_reserve` does not perform allocation.
103+
r_buf.try_reserve(PROBE_SIZE)?;
104+
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
105+
106+
Ok((r_fd, r_buf, is_eof))
107+
}
108+
109+
// Takes a amount of length to read and returns a singluar read in the buffer
110+
//
111+
// Returns the file descriptor, buffer and EOF reached or not
112+
async fn op_read(
113+
mut fd: OwnedFd,
114+
mut buf: Vec<u8>,
115+
offset: &mut u64,
116+
read_len: u32,
117+
) -> io::Result<(OwnedFd, Vec<u8>, bool)> {
118+
loop {
119+
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await;
120+
121+
match res {
122+
Err(e) if e.kind() == ErrorKind::Interrupted => {
123+
buf = r_buf;
124+
fd = r_fd;
125+
}
126+
Err(e) => return Err(e),
127+
Ok(size_read) => {
128+
*offset += size_read as u64;
129+
130+
return Ok((r_fd, r_buf, size_read == 0));
131+
}
132+
}
133+
}
134+
}

tokio/src/io/uring/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub(crate) mod open;
2+
pub(crate) mod read;
23
pub(crate) mod utils;
34
pub(crate) mod write;

tokio/src/io/uring/read.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
2+
3+
use io_uring::{opcode, types};
4+
use std::io::{self, Error};
5+
use std::os::fd::{AsRawFd, OwnedFd};
6+
7+
#[derive(Debug)]
8+
pub(crate) struct Read {
9+
fd: OwnedFd,
10+
buf: Vec<u8>,
11+
}
12+
13+
impl Completable for Read {
14+
type Output = (io::Result<u32>, OwnedFd, Vec<u8>);
15+
16+
fn complete(self, cqe: CqeResult) -> Self::Output {
17+
let mut buf = self.buf;
18+
19+
if let Ok(len) = cqe.result {
20+
let new_len = buf.len() + len as usize;
21+
// SAFETY: Kernel read len bytes
22+
unsafe { buf.set_len(new_len) };
23+
}
24+
25+
(cqe.result, self.fd, buf)
26+
}
27+
28+
fn complete_with_error(self, err: Error) -> Self::Output {
29+
(Err(err), self.fd, self.buf)
30+
}
31+
}
32+
33+
impl Cancellable for Read {
34+
fn cancel(self) -> CancelData {
35+
CancelData::Read(self)
36+
}
37+
}
38+
39+
impl Op<Read> {
40+
// Submit a request to read a FD at given length and offset into a
41+
// dynamic buffer with uninitialized memory. The read happens on uninitialized
42+
// buffer and no overwriting happens.
43+
44+
// SAFETY: The `len` of the amount to be read and the buffer that is passed
45+
// should have capacity > len.
46+
//
47+
// If `len` read is higher than vector capacity then setting its length by
48+
// the caller in terms of size_read can be unsound.
49+
pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>, len: u32, offset: u64) -> Self {
50+
// don't overwrite on already written part
51+
assert!(buf.spare_capacity_mut().len() >= len as usize);
52+
let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast();
53+
54+
let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len)
55+
.offset(offset)
56+
.build();
57+
58+
// SAFETY: Parameters are valid for the entire duration of the operation
59+
unsafe { Op::new(read_op, Read { fd, buf }) }
60+
}
61+
}

tokio/src/runtime/driver/op.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::io::uring::open::Open;
2+
use crate::io::uring::read::Read;
23
use crate::io::uring::write::Write;
34
use crate::runtime::Handle;
45

@@ -17,6 +18,7 @@ use std::task::{Context, Poll, Waker};
1718
pub(crate) enum CancelData {
1819
Open(Open),
1920
Write(Write),
21+
Read(Read),
2022
}
2123

2224
#[derive(Debug)]

0 commit comments

Comments
 (0)