From 7368a103a6064232135f93d0c2ff45eb4444823d Mon Sep 17 00:00:00 2001 From: micheal-ndoh Date: Sun, 6 Apr 2025 10:55:38 +0100 Subject: [PATCH 1/8] docs(issue-2): add detailed docstrings and internal comments Signed-off-by: micheal-ndoh --- src/bin/client.rs | 11 +++++++++-- src/bin/server/connection.rs | 29 ++++++++++++++++++++++++++++- src/bin/server/group.rs | 23 ++++++++++++++++++++--- src/bin/server/group_table.rs | 13 +++++++++++++ src/bin/server/main.rs | 7 +++++++ src/lib.rs | 16 ++++++++++++---- src/utils.rs | 13 +++++++++++++ 7 files changed, 102 insertions(+), 10 deletions(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index 0602bf7..c77c73c 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -3,12 +3,18 @@ use async_chat::{FromServer, utils}; use async_std::{io::BufReader, net, prelude::FutureExt, stream::StreamExt, task}; +/// Client binary for connecting to the async chat server. +/// +/// Expects one argument: the server address and port to connect to. +/// Example usage: `client 127.0.0.1:8080` fn main() -> anyhow::Result<()> { let address = std::env::args().nth(1).expect("Usage: client ADDRESS:PORT"); task::block_on(async { let socket = net::TcpStream::connect(address).await?; - socket.set_nodelay(true)?; + socket.set_nodelay(true)?; // Disable Nagle's algorithm for lower latency + + // Race two futures: sending commands vs. receiving server messages let to_server = send_commands(socket.clone()); let from_server = handle_replies(socket); @@ -17,11 +23,12 @@ fn main() -> anyhow::Result<()> { }) } +/// /// Reads user input (planned via `clap`) and sends commands to the server. async fn send_commands(_to_server: net::TcpStream) -> anyhow::Result<()> { // TODO: Implement use clap to parse command line arguments and print help message todo!() } - +/// Handles responses from the server and prints them to stdout. async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> { let buffered = BufReader::new(from_server); let mut reply_stream = utils::receive_as_json(buffered); diff --git a/src/bin/server/connection.rs b/src/bin/server/connection.rs index b3b11ed..af63529 100644 --- a/src/bin/server/connection.rs +++ b/src/bin/server/connection.rs @@ -7,11 +7,28 @@ use async_std::prelude::*; use async_std::sync::Arc; use async_std::sync::Mutex; +/// Represents a thread-safe outbound connection to a client. +/// This struct wraps a `TcpStream` in a `Mutex` to provide a safe and exclusive way to send data to the client. + pub struct Outbound(Mutex); impl Outbound { + /// Creates a new `Outbound` connection. + /// + /// # Arguments + /// + /// * `to_client` - The TCP stream to write to. pub fn new(to_client: TcpStream) -> Outbound { Outbound(Mutex::new(to_client)) } + /// Sends a message to the connected client in JSON format. + /// + /// # Arguments + /// + /// * `packet` - The message to send, wrapped in the `FromServer` enum. + /// + /// # Errors + /// + /// Returns an error if writing or flushing to the stream fails. pub async fn send(&self, packet: FromServer) -> anyhow::Result<()> { let mut guard = self.0.lock().await; utils::send_as_json(&mut *guard, &packet).await?; @@ -20,6 +37,16 @@ impl Outbound { } } +/// Serves a single client connection by reading messages and interacting with group state. +/// +/// # Arguments +/// +/// * `socket` - The TCP connection to the client. +/// * `groups` - A shared reference to the server's group table. +/// +/// # Errors +/// +/// Returns an error if reading from the socket or sending a message fails. pub async fn serve(socket: TcpStream, groups: Arc) -> anyhow::Result<()> { // wrapping our connection in outbound so as to have exclusive access to it in the groups and avoid interference let outbound = Arc::new(Outbound::new(socket.clone())); @@ -45,7 +72,7 @@ pub async fn serve(socket: TcpStream, groups: Arc) -> anyhow::Result None => Err(format!("Group '{}' does not exist", group_name)), }, }; - // not a valid request + // If an error occurred, send an error message back to the client if let Err(message) = result { let report = FromServer::Error(message); // send error back to client diff --git a/src/bin/server/group.rs b/src/bin/server/group.rs index 0e001e7..597a4ba 100644 --- a/src/bin/server/group.rs +++ b/src/bin/server/group.rs @@ -5,27 +5,44 @@ use async_std::task; use std::sync::Arc; use tokio::sync::broadcast; +/// A named group that broadcasts messages to all connected subscribers. pub struct Group { name: Arc, sender: broadcast::Sender>, } impl Group { + /// Creates a new `Group` with a given name. + /// + /// # Arguments + /// + /// * `name` - The name of the group. pub fn new(name: Arc) -> Group { - let (sender, _receiver) = broadcast::channel(1000); + let (sender, _receiver) = broadcast::channel(1000); // buffer size of 1000 messages Group { name, sender } } - + /// Adds a new outbound client to the group, subscribing them to messages. + /// + /// # Arguments + /// + /// * `outbound` - The client connection to receive messages. pub fn join(&self, outbound: Arc) { let receiver = self.sender.subscribe(); task::spawn(handle_subscriber(self.name.clone(), receiver, outbound)); } - + /// Posts a message to the group, broadcasting it to all subscribers. + /// + /// # Arguments + /// + /// * `message` - The message to broadcast. pub fn post(&self, message: Arc) { let _ = self.sender.send(message); // Ignoring the result to suppress warning } } +/// Handles the lifecycle of a subscriber: receiving messages and sending them over their connection. +/// +/// This is a stub — should be implemented to read from the `receiver` and forward messages to `outbound`. async fn handle_subscriber( _group_name: Arc, _receiver: broadcast::Receiver>, diff --git a/src/bin/server/group_table.rs b/src/bin/server/group_table.rs index 56f3725..f3d938a 100644 --- a/src/bin/server/group_table.rs +++ b/src/bin/server/group_table.rs @@ -4,13 +4,26 @@ use crate::group::Group; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +/// A thread-safe table that stores all active chat groups by name. +/// +/// Internally wraps a `HashMap, Arc>` in a `Mutex` for safe concurrent access. pub struct GroupTable(Mutex, Arc>>); impl GroupTable { + /// Creates a new, empty `GroupTable`. pub fn new() -> GroupTable { GroupTable(Mutex::new(HashMap::new())) } + /// Retrieves a group by name, if it exists. + /// + /// # Arguments + /// + /// * `name` - The name of the group to retrieve. + /// + /// # Returns + /// + /// An `Option` containing the group, or `None` if it doesn't exist. pub fn get(&self, name: &String) -> Option> { self.0.lock().unwrap().get(name).cloned() } diff --git a/src/bin/server/main.rs b/src/bin/server/main.rs index 1d85de9..335347b 100644 --- a/src/bin/server/main.rs +++ b/src/bin/server/main.rs @@ -9,15 +9,21 @@ use async_std::prelude::*; use async_std::task; use std::sync::Arc; +/// The main entry point for the async-chat server. +/// +/// Accepts incoming TCP connections and spawns a task to handle each. +/// Expects one argument: the address to bind to (e.g., `127.0.0.1:8080`) fn main() -> anyhow::Result<()> { let address = std::env::args().nth(1).expect( "Usage: server ADDRESS", ); + // A thread-safe table that stores all active chat groups by name. let chat_group_table = Arc::new(group_table::GroupTable::new()); async_std::task::block_on(async { let listener = TcpListener::bind(address).await?; let mut new_connections = listener.incoming(); + // Accept incoming connections and spawn a task to handle each while let Some(socket_result) = new_connections.next().await { let socket = socket_result?; let groups = chat_group_table.clone(); @@ -29,6 +35,7 @@ fn main() -> anyhow::Result<()> { }) } +/// Logs errors from client handler tasks. fn log_error(result: anyhow::Result<()>) { if let Err(error) = result { eprintln!("Error: {}", error); diff --git a/src/lib.rs b/src/lib.rs index 4f1b1ae..87ec78a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,25 +1,33 @@ +//! # async-chat +//! +//! A simple async group chat system implemented in Rust, using `async-std` for concurrency. +//! This crate defines the message formats and utility functions used by both the client and server. + use std::sync::Arc; use serde::{Deserialize, Serialize}; pub mod utils; +/// Messages that clients can send to the server. #[derive(Debug, Deserialize, Serialize, PartialEq)] pub enum FromClient { - Join { - group_name: Arc, - }, + /// Join a group by name. + Join { group_name: Arc }, + /// Post a message to a group. Post { group_name: Arc, message: Arc, }, } - +/// Messages that the server sends back to clients. #[derive(Debug, Deserialize, Serialize)] pub enum FromServer { + /// A message has been posted to a group. Message { group_name: Arc, message: Arc, }, + /// The server encountered an error. Error(String), } diff --git a/src/utils.rs b/src/utils.rs index 3859aa5..4a9a0fa 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,11 @@ use async_std::prelude::*; use serde::de::DeserializeOwned; +/// Sends a serializable packet as a JSON-encoded line over a writable stream. +/// +/// # Arguments +/// * `outbound` - The writable stream to send through. +/// * `packet` - The serializable data to be sent. pub async fn send_as_json(outbound: &mut S, packet: &P) -> anyhow::Result<()> where S: async_std::io::Write + Unpin, @@ -12,6 +17,14 @@ where Ok(()) } +/// Returns a stream of deserialized packets from a buffered input stream. +/// +/// # Arguments +/// * `inbound` - A stream of lines containing JSON messages. +/// +/// # Returns +/// A stream of parsed packets of type `P`. + pub fn receive_as_json(inbound: S) -> impl Stream> where S: async_std::io::BufRead + Unpin, From 1e23f08709ece3079430b666256918e76c93acfa Mon Sep 17 00:00:00 2001 From: micheal-ndoh Date: Sun, 6 Apr 2025 11:22:48 +0100 Subject: [PATCH 2/8] refactor: remove unnecessary blank lines in connection and utils modules --- src/bin/server/connection.rs | 1 - src/utils.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/src/bin/server/connection.rs b/src/bin/server/connection.rs index af63529..f300a73 100644 --- a/src/bin/server/connection.rs +++ b/src/bin/server/connection.rs @@ -9,7 +9,6 @@ use async_std::sync::Mutex; /// Represents a thread-safe outbound connection to a client. /// This struct wraps a `TcpStream` in a `Mutex` to provide a safe and exclusive way to send data to the client. - pub struct Outbound(Mutex); impl Outbound { /// Creates a new `Outbound` connection. diff --git a/src/utils.rs b/src/utils.rs index 4a9a0fa..dc05761 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -24,7 +24,6 @@ where /// /// # Returns /// A stream of parsed packets of type `P`. - pub fn receive_as_json(inbound: S) -> impl Stream> where S: async_std::io::BufRead + Unpin, From f1ff285ef6a679e0044da680dcbcd0d72a43d74e Mon Sep 17 00:00:00 2001 From: micheal-ndoh Date: Sun, 6 Apr 2025 18:24:26 +0100 Subject: [PATCH 3/8] docs(issue-2): explain task spawning in join method Signed-off-by: micheal-ndoh --- src/bin/server/group.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/bin/server/group.rs b/src/bin/server/group.rs index 597a4ba..0e93150 100644 --- a/src/bin/server/group.rs +++ b/src/bin/server/group.rs @@ -21,11 +21,15 @@ impl Group { let (sender, _receiver) = broadcast::channel(1000); // buffer size of 1000 messages Group { name, sender } } - /// Adds a new outbound client to the group, subscribing them to messages. + /// Adds a client connection to the group and starts sending messages to it. /// /// # Arguments /// /// * `outbound` - The client connection to receive messages. + /// + /// This function spawns a background task to handle receiving messages from the + /// broadcast channel and forwarding them to the client. A task is used so that + /// the message receiving loop can run asynchronously without blocking the caller. pub fn join(&self, outbound: Arc) { let receiver = self.sender.subscribe(); task::spawn(handle_subscriber(self.name.clone(), receiver, outbound)); From 44d76e27e865cc542dd8a1d728f0730e0db18ddf Mon Sep 17 00:00:00 2001 From: micheal-ndoh Date: Sun, 6 Apr 2025 18:30:16 +0100 Subject: [PATCH 4/8] docs(issue-2): clarify error condition in serve function Signed-off-by: micheal-ndoh --- src/bin/server/connection.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/bin/server/connection.rs b/src/bin/server/connection.rs index f300a73..9f2c492 100644 --- a/src/bin/server/connection.rs +++ b/src/bin/server/connection.rs @@ -45,7 +45,10 @@ impl Outbound { /// /// # Errors /// -/// Returns an error if reading from the socket or sending a message fails. +/// Returns an error if: +/// - Reading from the socket fails +/// - Sending a message fails +/// - A user tries to post to a group that does not exist pub async fn serve(socket: TcpStream, groups: Arc) -> anyhow::Result<()> { // wrapping our connection in outbound so as to have exclusive access to it in the groups and avoid interference let outbound = Arc::new(Outbound::new(socket.clone())); From d2157633ca85ea439540b1639fbbf4ac9e9e2a21 Mon Sep 17 00:00:00 2001 From: micheal-ndoh Date: Sun, 6 Apr 2025 18:33:57 +0100 Subject: [PATCH 5/8] docs(issue-2): clarify comment on handling incoming connections in main function Signed-off-by: micheal-ndoh --- src/bin/server/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/server/main.rs b/src/bin/server/main.rs index 335347b..048d6b5 100644 --- a/src/bin/server/main.rs +++ b/src/bin/server/main.rs @@ -23,7 +23,7 @@ fn main() -> anyhow::Result<()> { async_std::task::block_on(async { let listener = TcpListener::bind(address).await?; let mut new_connections = listener.incoming(); - // Accept incoming connections and spawn a task to handle each + // Accept incoming connections and spawn an asynchronous task to handle each while let Some(socket_result) = new_connections.next().await { let socket = socket_result?; let groups = chat_group_table.clone(); From a8b466ec4e9b01b15e339d07bda144bd6ac16801 Mon Sep 17 00:00:00 2001 From: micheal-ndoh Date: Sun, 6 Apr 2025 18:35:15 +0100 Subject: [PATCH 6/8] docs(issue-2): fix duplicate slashes in doc comment for send_commands function Signed-off-by: micheal-ndoh --- src/bin/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index c77c73c..1fc3f05 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -23,7 +23,7 @@ fn main() -> anyhow::Result<()> { }) } -/// /// Reads user input (planned via `clap`) and sends commands to the server. +/// Reads user input (planned via `clap`) and sends commands to the server. async fn send_commands(_to_server: net::TcpStream) -> anyhow::Result<()> { // TODO: Implement use clap to parse command line arguments and print help message todo!() From fe444234996da1be3f26b12a39be860fca5f818a Mon Sep 17 00:00:00 2001 From: micheal-ndoh Date: Sun, 6 Apr 2025 19:46:59 +0100 Subject: [PATCH 7/8] docs(issue-2): fix duplicate slashes in doc comment for send_commands function Signed-off-by: micheal-ndoh --- src/bin/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index 1fc3f05..b5529d2 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -12,9 +12,9 @@ fn main() -> anyhow::Result<()> { task::block_on(async { let socket = net::TcpStream::connect(address).await?; - socket.set_nodelay(true)?; // Disable Nagle's algorithm for lower latency + socket.set_nodelay(true)?; // Disable Nagle's algorithm for lower latency. - // Race two futures: sending commands vs. receiving server messages + // Race two futures: sending commands vs. receiving server. let to_server = send_commands(socket.clone()); let from_server = handle_replies(socket); From d23d02448fec1b8cac80ef7945a7d9ce6d55747a Mon Sep 17 00:00:00 2001 From: micheal-ndoh Date: Wed, 9 Apr 2025 10:20:07 +0100 Subject: [PATCH 8/8] docs(issue-2): refine documentation for handle_replies function to clarify stdout output behavior Signed-off-by: micheal-ndoh --- src/bin/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index b5529d2..a9b9ff4 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -28,7 +28,7 @@ async fn send_commands(_to_server: net::TcpStream) -> anyhow::Result<()> { // TODO: Implement use clap to parse command line arguments and print help message todo!() } -/// Handles responses from the server and prints them to stdout. +/// Handles responses from the server and prints them to stdout as they arrive. async fn handle_replies(from_server: net::TcpStream) -> anyhow::Result<()> { let buffered = BufReader::new(from_server); let mut reply_stream = utils::receive_as_json(buffered);