Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dscale/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dscale"
version = "0.3.0"
version = "0.3.1"
edition = "2024"
authors = ["Konstantin Shprenger <konstantin@shprenger.com>"]
license = "MIT"
Expand Down
91 changes: 48 additions & 43 deletions dscale/src/helpers/combiner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

use std::usize;

/// A compile-time sized collector for gathering multiple values.
/// A runtime-configured collector for gathering multiple values.
///
/// `Combiner` is designed for scenarios where you need to collect exactly `K`
/// values before proceeding with computation. It's particularly useful for
/// `Combiner` is designed for scenarios where you need to collect exactly a specified
/// number of values before proceeding with computation. It's particularly useful for
/// implementing distributed system patterns such as:
///
/// - **Quorum Systems**: Wait for a majority of responses before making decisions
Expand All @@ -20,16 +20,15 @@ use std::usize;
///
/// # Design Philosophy
///
/// - **Compile-Time Size**: The collection size `K` is known at compile time,
/// enabling stack allocation and zero-cost abstractions
/// - **Runtime Size**: The collection size is specified at runtime during construction,
/// providing flexibility while maintaining efficiency
/// - **One-Shot**: Each combiner instance produces exactly one complete collection
/// - **Type Safety**: Generic over the value type `T` with compile-time guarantees
/// - **Memory Efficient**: Uses stack allocation for small to medium collection sizes
/// - **Type Safety**: Generic over the value type `T` with runtime guarantees
/// - **Memory Efficient**: Uses vector with pre-allocated capacity for efficiency
///
/// # Generic Parameters
///
/// - `T`: The type of values to collect. Must implement `Sized`.
/// - `K`: The number of values to collect (compile-time constant)
///
/// # Examples
///
Expand All @@ -39,7 +38,7 @@ use std::usize;
/// use dscale::helpers::Combiner;
///
/// // Collect exactly 3 responses for a quorum
/// let mut quorum: Combiner<String, 3> = Combiner::new();
/// let mut quorum: Combiner<String> = Combiner::new(3);
///
/// // Add responses one by one
/// assert!(quorum.combine("vote_yes".to_string()).is_none()); // Not ready yet
Expand Down Expand Up @@ -75,7 +74,7 @@ use std::usize;
/// fn start(&mut self) {
/// // Start a new consensus round
/// self.proposal_id = 1;
/// self.vote_collector = Some(Combiner::new());
/// self.vote_collector = Some(Combiner::new(3));
///
/// // Send vote requests to other processes
/// // send_to(1, VoteMessage { proposal_id: 1, vote: true });
Expand Down Expand Up @@ -121,7 +120,7 @@ use std::usize;
/// }
///
/// fn collect_responses() {
/// let mut collector: Combiner<ServerResponse, 5> = Combiner::new();
/// let mut collector: Combiner<ServerResponse> = Combiner::new(5);
///
/// // Simulate receiving responses from 5 servers
/// let responses = vec![
Expand All @@ -147,10 +146,10 @@ use std::usize;
///
/// # Performance Characteristics
///
/// - **Memory**: Uses stack allocation for the internal array
/// - **Memory**: Uses vector with pre-allocated capacity
/// - **Time Complexity**: O(1) for `combine()` operations
/// - **Space Complexity**: O(K) where K is the collection size
/// - **Zero Allocation**: No heap allocations during normal operation
/// - **Space Complexity**: O(threshold) where threshold is the collection size
/// - **Minimal Allocation**: Single allocation during construction
///
/// # Common Use Cases in Distributed Systems
///
Expand All @@ -164,21 +163,22 @@ use std::usize;
///
/// `Combiner` is not thread-safe by default, but this is not a concern in
/// DScale's single-threaded simulation environment.
pub struct Combiner<T: Sized, const K: usize> {
quorum: [Option<T>; K],
pub struct Combiner<T: Sized> {
values: Vec<T>,
threshold: usize,
idx: usize,
}

impl<T: Sized, const K: usize> Combiner<T, K> {
/// Creates a new combiner that will collect exactly `K` values.
impl<T: Sized> Combiner<T> {
/// Creates a new combiner that will collect exactly `threshold` values.
///
/// This constructor initializes an empty combiner ready to accept values
/// through the [`combine`] method. The combiner will return `None` from
/// [`combine`] until exactly `K` values have been provided.
/// [`combine`] until exactly `threshold` values have been provided.
///
/// # Compile-Time Requirements
/// # Requirements
///
/// The constant `K` must be greater than 0. This is enforced by a debug
/// The `threshold` must be greater than 0. This is enforced by a debug
/// assertion to catch programming errors during development.
///
/// # Examples
Expand All @@ -187,20 +187,24 @@ impl<T: Sized, const K: usize> Combiner<T, K> {
/// use dscale::helpers::Combiner;
///
/// // Create combiners for different quorum sizes
/// let simple_majority: Combiner<bool, 3> = Combiner::new();
/// let byzantine_quorum: Combiner<String, 7> = Combiner::new(); // 2f+1 for f=3
/// let unanimous: Combiner<u32, 5> = Combiner::new();
/// let simple_majority: Combiner<bool> = Combiner::new(3);
/// let byzantine_quorum: Combiner<String> = Combiner::new(7); // 2f+1 for f=3
/// let unanimous: Combiner<u32> = Combiner::new(5);
/// ```
///
/// # Panics
///
/// In debug builds, panics if `K` is 0.
/// In debug builds, panics if `threshold` is 0.
///
/// [`combine`]: Combiner::combine
pub fn new() -> Self {
debug_assert!(K > 0, "Quorum threshold should be greater than zero");
pub fn new(threshold: usize) -> Self {
debug_assert!(
threshold > 0,
"Combinter threshold should be greater than zero"
);
Self {
quorum: [const { None }; K],
values: Vec::with_capacity(threshold),
threshold,
idx: 0,
}
}
Expand All @@ -209,16 +213,16 @@ impl<T: Sized, const K: usize> Combiner<T, K> {
///
/// This method accepts one value and adds it to the internal collection.
/// It returns:
/// - `None` if fewer than `K` values have been collected
/// - `Some([T; K])` when exactly `K` values have been collected
/// - `None` if fewer than `threshold` values have been collected
/// - `Some(&[T])` when exactly `threshold` values have been collected
///
/// Once a complete collection is returned, the combiner is considered
/// "consumed" and subsequent calls will always return `None`.
///
/// # Behavior
///
/// - **Before Completion**: Returns `None` and stores the value internally
/// - **At Completion**: Returns `Some(array)` containing all K values in order
/// - **At Completion**: Returns `Some(slice)` containing all values in order
/// - **After Completion**: Always returns `None` (combiner is exhausted)
///
/// # Parameters
Expand All @@ -228,7 +232,7 @@ impl<T: Sized, const K: usize> Combiner<T, K> {
/// # Returns
///
/// - `None` if the collection is not yet complete
/// - `Some([T; K])` when the collection is complete, containing all values in insertion order
/// - `Some(&[T])` when the collection is complete, containing all values in insertion order
///
/// # Examples
///
Expand All @@ -237,15 +241,15 @@ impl<T: Sized, const K: usize> Combiner<T, K> {
/// ```rust
/// use dscale::helpers::Combiner;
///
/// let mut collector: Combiner<i32, 3> = Combiner::new();
/// let mut collector: Combiner<i32> = Combiner::new(3);
///
/// // First two values return None
/// assert!(collector.combine(10).is_none());
/// assert!(collector.combine(20).is_none());
///
/// // Third value completes the collection
/// if let Some(values) = collector.combine(30) {
/// assert_eq!(values, [10, 20, 30]);
/// assert_eq!(values, &[10, 20, 30]);
/// }
///
/// // Subsequent calls return None
Expand All @@ -258,7 +262,7 @@ impl<T: Sized, const K: usize> Combiner<T, K> {
/// use dscale::helpers::Combiner;
///
/// fn process_votes() -> bool {
/// let mut vote_collector: Combiner<bool, 5> = Combiner::new();
/// let mut vote_collector: Combiner<bool> = Combiner::new(5);
///
/// // Simulate receiving votes
/// let votes = [true, true, false, true, false];
Expand Down Expand Up @@ -287,7 +291,7 @@ impl<T: Sized, const K: usize> Combiner<T, K> {
/// }
///
/// fn handle_responses() {
/// let mut collector: Combiner<Response, 3> = Combiner::new();
/// let mut collector: Combiner<Response> = Combiner::new(3);
///
/// // Process responses as they arrive
/// let responses = [
Expand Down Expand Up @@ -321,15 +325,16 @@ impl<T: Sized, const K: usize> Combiner<T, K> {
/// - Values are stored in insertion order
/// - Memory is pre-allocated on the stack for efficiency
/// - The operation is O(1) with no heap allocations
pub fn combine(&mut self, value: T) -> Option<[T; K]> {
self.quorum[self.idx] = Some(value);
pub fn combine(&mut self, value: T) -> Option<&[T]> {
if self.idx >= self.threshold {
return None;
}

self.values.push(value);
self.idx += 1;

if self.idx == K {
let mut result = [const { None }; K];
std::mem::swap(&mut result, &mut self.quorum);
// Unwraping is safe because we know all slots are filled
Some(result.map(|opt| opt.unwrap()))
if self.idx == self.threshold {
Some(&self.values)
} else {
None
}
Expand Down