Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ edition = "2021"
description = "Bufferering types for embedded-io"
readme = "README.md"
repository = "https://github.com/rmja/buffered-io"
authors = ["Rasmus Melchior Jacobsen <rmja@laesoe.org>"]
authors = ["Rasmus Melchior Jacobsen <rmja@laesoe.org>", "Tage Johansson"]
license = "MIT / Apache-2.0"
keywords = ["embedded", "buffer", "embedded-io", "read", "write"]
exclude = [".github"]

[features]
async = ["dep:embedded-io-async"]

[dependencies]
embedded-io = { version = "0.7" }
embedded-io-async = { version = "0.7" }
embedded-io-async = { version = "0.7", optional = true }

[dev-dependencies]
embedded-io-async = { version = "0.7", features = ["std"] }
Expand Down
24 changes: 11 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ The `buffered-io` crate implements buffering for the `embedded-io`/`embedded-io-
## Example

```rust
tokio_test::block_on(async {
use buffered_io::asynch::BufferedWrite;
use embedded_io_async::Write;

let uart_tx = Vec::new(); // The underlying uart peripheral implementing Write to where buffered bytes are written
let mut write_buf = [0; 120];
let mut buffering = BufferedWrite::new(uart_tx, &mut write_buf);
buffering.write(b"hello").await.unwrap(); // This write is buffered
buffering.write(b" ").await.unwrap(); // This write is also buffered
buffering.write(b"world").await.unwrap(); // This write is also buffered
buffering.flush().await.unwrap(); // The string "hello world" is written to uart in one write
})
```
use buffered_io::BufferedWrite;
use embedded_io::Write;

let uart_tx = Vec::new(); // The underlying uart peripheral implementing Write to where buffered bytes are written
let mut write_buf = [0; 120];
let mut buffering = BufferedWrite::new(uart_tx, &mut write_buf);
buffering.write(b"hello").unwrap(); // This write is buffered
buffering.write(b" ").unwrap(); // This write is also buffered
buffering.write(b"world").unwrap(); // This write is also buffered
buffering.flush().unwrap(); // The string "hello world" is written to uart in one write
```
9 changes: 0 additions & 9 deletions src/asynch/mod.rs

This file was deleted.

11 changes: 10 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
#![doc = include_str!("../README.md")]
#![cfg_attr(not(test), no_std)]
pub mod asynch;

mod read;
mod write;

pub use read::BufferedRead;
pub use write::BufferedWrite;

/// Unable to bypass the current buffered reader or writer because there are buffered bytes.
#[derive(Debug)]
pub struct BypassError;
66 changes: 34 additions & 32 deletions src/asynch/read.rs → src/read.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use embedded_io_async::{BufRead, Read, Write};
#[cfg(feature = "async")]
mod asynch;

use embedded_io::{BufRead, ErrorType, Read, Write};

use super::BypassError;

/// A buffered [`Read`]
///
/// The BufferedRead will read into the provided buffer to avoid small reads to the inner reader.
pub struct BufferedRead<'buf, T: Read> {
pub struct BufferedRead<'buf, T> {
inner: T,
buf: &'buf mut [u8],
offset: usize,
available: usize,
}

impl<'buf, T: Read> BufferedRead<'buf, T> {
impl<'buf, T> BufferedRead<'buf, T> {
/// Create a new buffered reader
pub fn new(inner: T, buf: &'buf mut [u8]) -> Self {
Self {
Expand Down Expand Up @@ -61,33 +64,33 @@ impl<'buf, T: Read> BufferedRead<'buf, T> {
}
}

impl<T: Read> embedded_io::ErrorType for BufferedRead<'_, T> {
impl<T: ErrorType> ErrorType for BufferedRead<'_, T> {
type Error = T::Error;
}

impl<T: Read + Write> Write for BufferedRead<'_, T> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
self.inner.write(buf).await
fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
self.inner.write(buf)
}

async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> {
self.inner.write_all(buf).await
fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> {
self.inner.write_all(buf)
}

async fn flush(&mut self) -> Result<(), Self::Error> {
self.inner.flush().await
fn flush(&mut self) -> Result<(), Self::Error> {
self.inner.flush()
}
}

impl<T: Read> Read for BufferedRead<'_, T> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
if self.available == 0 {
if buf.len() >= self.buf.len() {
// Fast path - bypass local buffer
return self.inner.read(buf).await;
return self.inner.read(buf);
}
self.offset = 0;
self.available = self.inner.read(self.buf).await?;
self.available = self.inner.read(self.buf)?;
}

let len = usize::min(self.available, buf.len());
Expand All @@ -106,10 +109,10 @@ impl<T: Read> Read for BufferedRead<'_, T> {
}

impl<T: Read> BufRead for BufferedRead<'_, T> {
async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
if self.available == 0 {
self.offset = 0;
self.available = self.inner.read(self.buf).await?;
self.available = self.inner.read(self.buf)?;
}

Ok(&self.buf[self.offset..self.offset + self.available])
Expand All @@ -124,71 +127,70 @@ impl<T: Read> BufRead for BufferedRead<'_, T> {

#[cfg(test)]
mod tests {
use super::*;
use embedded_io::{BufRead, Read};

use super::BufferedRead;

#[tokio::test]
async fn can_read_to_buffer() {
#[test]
fn can_read_to_buffer() {
let inner = [1, 2, 3, 4, 5, 6, 7, 8];
let mut buf = [0; 8];
let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);

let mut read_buf = [0; 2];
assert_eq!(2, buffered.read(&mut read_buf).await.unwrap());
assert_eq!(2, buffered.read(&mut read_buf).unwrap());
assert_eq!(2, buffered.offset);
assert_eq!(6, buffered.available);
assert_eq!(&[1, 2], read_buf.as_slice());

let mut read_buf = [0; 2];
assert_eq!(2, buffered.read(&mut read_buf).await.unwrap());
assert_eq!(2, buffered.read(&mut read_buf).unwrap());
assert_eq!(4, buffered.offset);
assert_eq!(4, buffered.available);
assert_eq!(&[3, 4], read_buf.as_slice());

let mut read_buf = [0; 8];
assert_eq!(4, buffered.read(&mut read_buf).await.unwrap());
assert_eq!(4, buffered.read(&mut read_buf).unwrap());
assert_eq!(4, buffered.offset);
assert_eq!(0, buffered.available);
assert_eq!(&[5, 6, 7, 8], &read_buf[..4]);
}

#[tokio::test]
async fn bypass_on_large_buf() {
#[test]
fn bypass_on_large_buf() {
let inner = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut buf = [0; 8];
let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);

let mut read_buf = [0; 10];
assert_eq!(10, buffered.read(&mut read_buf).await.unwrap());
assert_eq!(10, buffered.read(&mut read_buf).unwrap());
assert_eq!(0, buffered.offset);
assert_eq!(0, buffered.available);
assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], read_buf.as_slice());
}

#[tokio::test]
async fn can_buf_read() {
#[test]
fn can_buf_read() {
let inner = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut buf = [0; 8];
let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);
assert_eq!(0, buffered.offset);
assert_eq!(0, buffered.available);

assert_eq!(
&[1, 2, 3, 4, 5, 6, 7, 8],
buffered.fill_buf().await.unwrap()
);
assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8], buffered.fill_buf().unwrap());
assert_eq!(0, buffered.offset);
assert_eq!(8, buffered.available);

buffered.consume(2);
assert_eq!(2, buffered.offset);
assert_eq!(6, buffered.available);
assert_eq!(&[3, 4, 5, 6, 7, 8], buffered.fill_buf().await.unwrap());
assert_eq!(&[3, 4, 5, 6, 7, 8], buffered.fill_buf().unwrap());

buffered.consume(6);
assert_eq!(8, buffered.offset);
assert_eq!(0, buffered.available);

assert_eq!(&[9, 10], buffered.fill_buf().await.unwrap());
assert_eq!(&[9, 10], buffered.fill_buf().unwrap());
assert_eq!(0, buffered.offset);
assert_eq!(2, buffered.available);

Expand Down
136 changes: 136 additions & 0 deletions src/read/asynch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use embedded_io_async::{BufRead, Read, Write};

use super::BufferedRead;

impl<T: Read + Write> Write for BufferedRead<'_, T> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
self.inner.write(buf).await
}

async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> {
self.inner.write_all(buf).await
}

async fn flush(&mut self) -> Result<(), Self::Error> {
self.inner.flush().await
}
}

impl<T: Read> Read for BufferedRead<'_, T> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
if self.available == 0 {
if buf.len() >= self.buf.len() {
// Fast path - bypass local buffer
return self.inner.read(buf).await;
}
self.offset = 0;
self.available = self.inner.read(self.buf).await?;
}

let len = usize::min(self.available, buf.len());
buf[..len].copy_from_slice(&self.buf[self.offset..self.offset + len]);
if len < self.available {
// There are still bytes left
self.offset += len;
self.available -= len;
} else {
// The buffer is drained
self.available = 0;
}

Ok(len)
}
}

impl<T: Read> BufRead for BufferedRead<'_, T> {
async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
if self.available == 0 {
self.offset = 0;
self.available = self.inner.read(self.buf).await?;
}

Ok(&self.buf[self.offset..self.offset + self.available])
}

fn consume(&mut self, amt: usize) {
assert!(amt <= self.available);
self.offset += amt;
self.available -= amt;
}
}

#[cfg(test)]
mod async_tests {
use super::*;

#[tokio::test]
async fn can_read_to_buffer() {
let inner = [1, 2, 3, 4, 5, 6, 7, 8];
let mut buf = [0; 8];
let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);

let mut read_buf = [0; 2];
assert_eq!(2, buffered.read(&mut read_buf).await.unwrap());
assert_eq!(2, buffered.offset);
assert_eq!(6, buffered.available);
assert_eq!(&[1, 2], read_buf.as_slice());

let mut read_buf = [0; 2];
assert_eq!(2, buffered.read(&mut read_buf).await.unwrap());
assert_eq!(4, buffered.offset);
assert_eq!(4, buffered.available);
assert_eq!(&[3, 4], read_buf.as_slice());

let mut read_buf = [0; 8];
assert_eq!(4, buffered.read(&mut read_buf).await.unwrap());
assert_eq!(4, buffered.offset);
assert_eq!(0, buffered.available);
assert_eq!(&[5, 6, 7, 8], &read_buf[..4]);
}

#[tokio::test]
async fn bypass_on_large_buf() {
let inner = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut buf = [0; 8];
let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);

let mut read_buf = [0; 10];
assert_eq!(10, buffered.read(&mut read_buf).await.unwrap());
assert_eq!(0, buffered.offset);
assert_eq!(0, buffered.available);
assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], read_buf.as_slice());
}

#[tokio::test]
async fn can_buf_read() {
let inner = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut buf = [0; 8];
let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);
assert_eq!(0, buffered.offset);
assert_eq!(0, buffered.available);

assert_eq!(
&[1, 2, 3, 4, 5, 6, 7, 8],
buffered.fill_buf().await.unwrap()
);
assert_eq!(0, buffered.offset);
assert_eq!(8, buffered.available);

buffered.consume(2);
assert_eq!(2, buffered.offset);
assert_eq!(6, buffered.available);
assert_eq!(&[3, 4, 5, 6, 7, 8], buffered.fill_buf().await.unwrap());

buffered.consume(6);
assert_eq!(8, buffered.offset);
assert_eq!(0, buffered.available);

assert_eq!(&[9, 10], buffered.fill_buf().await.unwrap());
assert_eq!(0, buffered.offset);
assert_eq!(2, buffered.available);

buffered.consume(2);
assert_eq!(2, buffered.offset);
assert_eq!(0, buffered.available);
}
}
Loading