diff --git a/examples/parse_stream.rs b/examples/parse_stream.rs new file mode 100644 index 0000000..354acda --- /dev/null +++ b/examples/parse_stream.rs @@ -0,0 +1,33 @@ +use std::io::{Cursor, Read}; +use nmea::stream::NmeaStreamParser; + +fn main() { + // Example data source: a Cursor over a byte slice + let data = b"$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n"; + let mut reader = Cursor::new(data); + + let mut parser = NmeaStreamParser::new(); + + let chunk_size = 10; + let mut buffer = vec![0; chunk_size]; + + // Read and process data in chunks + while let Ok(n) = reader.read(&mut buffer) { + if n == 0 { + break; // End of stream + } + + // Process the chunk and get complete messages + let messages = parser.process_chunk(&buffer[..n]); + + // Handle each complete message + for message in messages { + // Convert the message from Vec to a string for display + if let Ok(message_str) = String::from_utf8(message) { + println!("Parsed message: {}", message_str); + } else { + eprintln!("Failed to convert message to string"); + } + } + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 90308cc..5c1531e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ mod error; pub(crate) mod parse; mod parser; +pub mod stream; pub mod sentences; diff --git a/src/parser.rs b/src/parser.rs index f35136d..da72545 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -245,6 +245,7 @@ impl<'a> Nmea { /// - and other /// /// The type of sentence is returned if implemented and valid. + // MARK: parse which currently works based on strings pub fn parse(&mut self, sentence: &'a str) -> Result> { match parse_str(sentence)? { ParseResult::VTG(vtg) => { diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..ee9b03a --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,158 @@ +use std::io::Read; + +// an iterator-based parser that will parse new messages +// from e.g. streamed/received bytes when \r\n ending is present. +// +// we should 'chunkinate' the bytes based on the \r\n ending + +pub struct NmeaStreamParser { + buffer: Vec, + separator: Vec, +} + +impl NmeaStreamParser { + pub fn new() -> Self { + NmeaStreamParser { + buffer: Vec::new(), + separator: b"\r\n".to_vec(), + } + } + + pub fn process_chunk(&mut self, chunk: &[u8]) -> Vec> { + self.buffer.extend_from_slice(chunk); + let mut messages = Vec::new(); + + while let Some(pos) = self + .buffer + .windows(self.separator.len()) + .position(|window| window == self.separator.as_slice()) + { + let message = self.buffer.drain(..pos).collect(); + messages.push(message); + self.buffer.drain(..self.separator.len()); + } + + messages + } +} + +#[allow(dead_code)] +struct MessageStream { + reader: R, + parser: NmeaStreamParser, + chunk_size: usize, +} + +impl MessageStream { + #[allow(dead_code)] + fn new(reader: R, chunk_size: usize) -> Self { + MessageStream { + reader, + parser: NmeaStreamParser::new(), + chunk_size, + } + } +} + +impl Iterator for MessageStream { + type Item = Result, std::io::Error>; + + fn next(&mut self) -> Option { + let mut chunk = vec![0; self.chunk_size]; + loop { + match self.reader.read(&mut chunk) { + Ok(0) => return None, // End of stream + Ok(n) => { + let mut messages = self.parser.process_chunk(&chunk[..n]); + if !messages.is_empty() { + return Some(Ok(messages.remove(0))); + } + } + Err(e) => return Some(Err(e)), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_stream_parser() { + let mut parser = NmeaStreamParser::new(); + let chunk = b"$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n"; + let messages = parser.process_chunk(chunk); + assert_eq!(messages.len(), 2); + } + + #[test] + fn test_stream_parser_partial_message() { + let mut parser = NmeaStreamParser::new(); + let chunk1 = b"$GPGLL,4916.45,N,123"; + let chunk2 = b"11.12,W,225444,A,*1D\r\n"; + + let messages = parser.process_chunk(chunk1); + assert_eq!(messages.len(), 0); + + let messages = parser.process_chunk(chunk2); + assert_eq!(messages.len(), 1); + } + + #[test] + fn test_stream_parser_multiple_chunks() { + let mut parser = NmeaStreamParser::new(); + let chunks = [ + b"$GPGLL,4916.45,N,12311.12,W,2254", + b"44,A,*1D\r\n$GPGLL,4916.45,N,12300", + b"11.12,W,225444,A,*1D\r\n$GPGLL,4..", + b"916.45,N,12311.12,W,225444,A,*1D", + b"\r\n$GPGLL,4916.45,N,12311.12,W,..", + b"225444,A,*1D\r\n..................", + b"$GPGLL,4916.45,N,12311.12,W,2254", + b"44,A,*1D\r\n$GPGLL,4916.45,N,123..", + b"11.12,W,225444,A,*1D\r\n..........", + ]; + + let mut total_messages = 0; + for chunk in chunks { + let messages = parser.process_chunk(chunk); + total_messages += messages.len(); + } + assert_eq!(total_messages, 6); + } + + #[test] + fn test_stream_parser_invalid_data() { + let mut parser = NmeaStreamParser::new(); + let chunk = b"Invalid data without separators"; + let messages = parser.process_chunk(chunk); + assert_eq!(messages.len(), 0); + } + + #[test] + fn test_stream_parser_empty_chunk() { + let mut parser = NmeaStreamParser::new(); + let messages = parser.process_chunk(b""); + assert_eq!(messages.len(), 0); + } + + #[test] + fn test_message_stream() { + let data = b"$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r\n"; + let stream = MessageStream::new(&data[..], 10); + let messages: Vec, std::io::Error>> = stream.collect(); + assert_eq!(messages.len(), 2); + } + + #[test] + fn test_empty_message_stream() { + let data = b""; + let stream = MessageStream::new(&data[..], 10); + + // check that result is ok + for message in stream { + assert!(message.is_ok()); + } + } +}