Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,3 @@ jobs:

- name: Verify DScale
run: cargo run --bin ${{ matrix.binary }} --release --package examples
env:
RUST_LOG: info
80 changes: 44 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ This project provides a fast & deterministic simulation framework for testing an

To use the DScale, you need to implement the `ProcessHandle` trait for your distributed system and the `Message` trait for the data exchanged between processes.

### 1. Define Messages
### 1. Install

```shell
cargo add dscale
```

### 2. Define Messages

Messages must implement the `Message` trait, which allows defining a `virtual_size` for bandwidth simulation.

Expand All @@ -26,7 +32,7 @@ impl Message for MyMessage {
}
```

### 2. Implement Process Logic
### 3. Implement Process Logic

Implement `ProcessHandle` to define how your process reacts to initialization, messages, and timers.

Expand Down Expand Up @@ -58,7 +64,7 @@ impl ProcessHandle for MyProcess {
}
```

### 3. run the Simulation
### 4. Run the Simulation

Use `Simulationbuilder` to configure the topology, network constraints, and start the simulation.

Expand Down Expand Up @@ -86,70 +92,72 @@ fn main() {
### Simulation Control

- **`SimulationBuilder`**: Configures the simulation environment.
- `default()`: Creates simulation with no processes and default parameters.
- `seed(u64)`: Sets the random seed for deterministic execution.
- `time_budget(Jiffies)`: Sets the maximum duration of the simulation.
- `add_pool<P: ProcessHandle + Default + 'static>(&str, usize)`: Creates a pool of processes.
- `latency_topology(&[LatencyDescription])`: Configures network latency between pools or within them.
- `nic_bandwidth(BandwidthDescription)`: Configures network bandwidth limits (per process).
- `Bounded(usize)`: Limits bandwidth (bytes per jiffy).
- `default`: Creates simulation with no processes and default parameters.
- `seed`: Sets the random seed for deterministic execution.
- `time_budget`: Sets the maximum duration of the simulation.
- `add_pool`: Creates a pool of processes. (At the same time all procs become part of GLOBAL_POOL)
- `latency_topology`: Configures network latency between pools or within them.
- `nic_bandwidth`: Configures network bandwidth limits (per process).
- `Bounded`: Limits bandwidth (bytes per jiffy).
- `Unbounded`: No bandwidth limits.
- `build() -> Simulation`: Finalizes configuration and builds the simulation engine.
- `build`: Finalizes configuration and builds the simulation engine.
- **`Simulation`**: The engine driving the event loop.
- `run()`: Starts the simulation loop.
- `run`: Starts the simulation loop.

### Network Topology

- **`GLOBAL_POOL`**:
- Contains all processes. Simple broadcast uses this pool
- **`LatencyDescription`**:
- `WithinPool(&str, Distributions)`: Latency for messages between processes in the same pool.
- `BetweenPools(&str, &str, Distributions)`: Latency for messages between processes in different pools.
- `WithinPool`: Latency for messages between processes in the same pool.
- `BetweenPools`: Latency for messages between processes in different pools.
- **`Distributions`**:
- `Uniform(Jiffies, Jiffies)`
- `Bernoulli(f64, Jiffies)`
- `Normal(Jiffies, Jiffies)`
- `Uniform`
- `Bernoulli`
- `Normal`

### Process Interaction (Context-Aware)

These functions are available globally but must be called within the context of a running process step.

- **`broadcasst(impl Message)`**: Sends a message to all other processes.
- **`broadcasst_within_pool(&str, impl Message)`**: Sends a message to all other processes within a specific pool.
- **`send_to(ProcessId, impl Message)`**: Sends a message to a specific process.
- **`send_random_from_pool(&str, impl Message)`**: Sends a message to random process whithin pool.
- **`schedule_timer_after(Jiffies) -> TimerId`**: Schedules a timer interrupt for the current process.
- **`rank() -> ProcessId`**: Returns the ID of the currently executing process.
- **`now() -> Jiffies`**: Returns current simulation time.
- **`list_pool(&str) -> Vec<ProcessId>`**: List all processes in a pool.
- **`choose_from_pool(&str) -> ProcessId`**: Choose random process id from specified pool.
- **`global_unique_id() -> usize`**: Generates a globally unique ID.
- **`broadcasst`**: Sends a message to all other processes. (GLOBAL_POOL)
- **`broadcasst_within_pool`**: Sends a message to all other processes within a specific pool.
- **`send_to`**: Sends a message to a specific process.
- **`send_random_from_pool`**: Sends a message to random process whithin pool.
- **`schedule_timer_after`**: Schedules a timer interrupt for the current process.
- **`rank`**: Returns the ID of the currently executing process.
- **`now`**: Returns current simulation time.
- **`list_pool`**: List all processes in a pool.
- **`choose_from_pool`**: Choose random process id from specified pool.
- **`global_unique_id`**: Generates a globally unique ID.

### Configuration (`dscale::global::configuration`)

- **`seed() -> u64`**: Returns the specific seed for the current process.
- **`process_number() -> usize`**: Returns total number of processes in the simulation.
- **`seed`**: Returns the specific seed for the current process.
- **`process_number`**: Returns total number of processes in the simulation.

### Any Key-Value (`dscale::global::anykv`)

Useful for passing shared state, metrics, or configuration between processes or back to the host.

- **`get<T>(&str) -> T`**
- **`set<T>(&str, T)`**
- **`modify<T>(&str, impl FnOnce(&mut T))`**: Modify in-place.
- **`get -> T`**
- **`set(T)`**
- **`modify`**: Modify in-place.

### Helpers (`dscale::helpers`)

- **`debug_process!(fmt, ...)`**: A macro that automatically prepends current simulation time and process ID.
- **`debug_process!`**: A macro that automatically prepends current simulation time and process ID.
- **`Combiner`**: Structure which allows combining any values up to some known threshols. Can be useful for waiting for quorums.

## Logging Configuration (`RUST_LOG`)

DScale output is controlled via the `RUST_LOG` environment variable.

- **`RUST_LOG=info`**: Shows high-level simulation status and a progress bar.
- **`RUST_LOG=debug`**: Enables the `debug_process!` macro output and internal simulation events.
- **`RUST_LOG=your_crate=debug`**: Filter events only for your specific crate.
- **`RUST_LOG=debug`**: Enables all `debug_process!` macro output and all internal simulation events.
- **`RUST_LOG=full::path::to::your::file::or::crate=debug`**: Filter events only for your specific file or crate.

- Note `RUST_LOG=debug or RUST_LOG=any=debug` will work only without the `--release` flag.
- Note `RUST_LOG=debug or RUST_LOG=any::path=debug` will work only without the `--release` flag.

## Thanks to

Expand Down
1 change: 0 additions & 1 deletion dscale/src/communication/destination.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::ProcessId;

pub enum Destination {
Broadcast,
BroadcastWithinPool(&'static str),
To(ProcessId),
}
14 changes: 12 additions & 2 deletions dscale/src/global/access.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::{cell::RefCell, rc::Rc};

use crate::now;

use crate::{
Destination, Message, ProcessId,
actor::EventSubmitter,
debug_process,
network::NetworkActor,
random::Randomizer,
time::{
Jiffies,
timer_manager::{TimerId, TimerManagerActor, next_timer_id},
},
topology::Topology,
topology::{GLOBAL_POOL, Topology},
};

pub struct SimulationAccess {
Expand Down Expand Up @@ -68,7 +71,7 @@ impl SimulationAccess {
fn broadcast(&mut self, message: impl Message + 'static) {
self.scheduled_messages.push((
self.process_on_execution,
Destination::Broadcast,
Destination::BroadcastWithinPool(GLOBAL_POOL),
Rc::new(message),
));
}
Expand Down Expand Up @@ -144,22 +147,27 @@ pub(crate) fn schedule() {
}

pub fn schedule_timer_after(after: Jiffies) -> TimerId {
debug_process!("Access: scheduling timer after {after}");
with_access(|access| access.schedule_timer_after(after))
}

pub fn broadcast(message: impl Message + 'static) {
debug_process!("Access: broadcasting globally");
with_access(|access| access.broadcast(message));
}

pub fn broadcast_within_pool(pool: &'static str, message: impl Message + 'static) {
debug_process!("Access: broadcasting within: {pool}");
with_access(|access| access.broadcast_within_pool(pool, message));
}

pub fn send_to(to: ProcessId, message: impl Message + 'static) {
debug_process!("Access: send to: {to}");
with_access(|access| access.send_to(to, message));
}

pub fn send_random_from_pool(pool: &'static str, message: impl Message + 'static) {
debug_process!("Access: sending random from pool: {}", pool);
with_access(|access| access.send_random_from_pool(pool, message));
}

Expand All @@ -168,9 +176,11 @@ pub fn rank() -> ProcessId {
}

pub fn list_pool(name: &str) -> Vec<ProcessId> {
debug_process!("Access: listing pool: {name}");
with_access(|access| access.list_pool(name).to_vec())
}

pub fn choose_from_pool(name: &str) -> ProcessId {
debug_process!("Access: choosing random from pool: {name}");
with_access(|access| access.choose_from_pool(name))
}
1 change: 1 addition & 0 deletions dscale/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub use global::send_to;

pub use network::BandwidthDescription;

pub use topology::GLOBAL_POOL;
pub use topology::LatencyDescription;

pub use random::Distributions;
Expand Down
9 changes: 3 additions & 6 deletions dscale/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,13 @@ impl Network {
destination: Destination,
) {
let targets = match destination {
Destination::Broadcast => self.nursery.keys().copied().collect::<Vec<ProcessId>>(),
Destination::BroadcastWithinPool(pool_name) => {
self.topology.list_pool(pool_name).to_vec()
}
Destination::To(to) => vec![to],
Destination::BroadcastWithinPool(pool_name) => self.topology.list_pool(pool_name),
Destination::To(to) => &[to],
};

debug!("Submitting message from {source}, targets of the message: {targets:?}",);

targets.into_iter().for_each(|target| {
targets.into_iter().copied().for_each(|target| {
let routed_message = RoutedMessage {
arrival_time: now() + Jiffies(1), // Without any latency message will arrive on next timepoint;
step: ProcessStep {
Expand Down
5 changes: 2 additions & 3 deletions dscale/src/process/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! by all processes in DScale simulations, as well as the `ProcessId` type used
//! for process identification throughout the system.

use std::cell::RefCell;
use std::{cell::RefCell, rc::Rc};

use crate::{MessagePtr, time::timer_manager::TimerId};

Expand Down Expand Up @@ -58,8 +58,7 @@ use crate::{MessagePtr, time::timer_manager::TimerId};
/// [`ProcessHandle::on_message`]: ProcessHandle::on_message
pub type ProcessId = usize;

pub(crate) type UniqueProcessHandle = Box<dyn ProcessHandle>;
pub(crate) type MutableProcessHandle = RefCell<UniqueProcessHandle>;
pub(crate) type MutableProcessHandle = Rc<RefCell<dyn ProcessHandle>>;

/// Core trait that defines the behavior of a process in DScale simulations.
///
Expand Down
1 change: 0 additions & 1 deletion dscale/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ mod handle;
pub(crate) use handle::MutableProcessHandle;
pub use handle::ProcessHandle;
pub use handle::ProcessId;
pub(crate) use handle::UniqueProcessHandle;
2 changes: 1 addition & 1 deletion dscale/src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use log::log_enabled;

use crate::time::Jiffies;

const K_PROGRESS_TIMES: usize = 20;
const K_PROGRESS_TIMES: usize = 100;

pub(crate) struct Bar {
bar: ProgressBar,
Expand Down
29 changes: 21 additions & 8 deletions dscale/src/simulation_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
use std::{
cell::RefCell,
collections::{BTreeMap, HashMap},
rc::Rc,
};

use crate::{
ProcessHandle, ProcessId, Simulation,
network::BandwidthDescription,
process::UniqueProcessHandle,
process::MutableProcessHandle,
random::Seed,
time::Jiffies,
topology::{LatencyDescription, LatencyTopology},
topology::{GLOBAL_POOL, LatencyDescription, LatencyTopology},
};

fn init_logger() {
Expand Down Expand Up @@ -74,7 +75,7 @@ pub struct SimulationBuilder {
seed: Seed,
time_budget: Jiffies,
proc_id: usize,
pools: HashMap<String, Vec<(ProcessId, UniqueProcessHandle)>>,
pools: HashMap<String, Vec<(ProcessId, MutableProcessHandle)>>,
latency_topology: LatencyTopology,
bandwidth: BandwidthDescription,
}
Expand Down Expand Up @@ -151,15 +152,27 @@ impl SimulationBuilder {
name: &str,
size: usize,
) -> SimulationBuilder {
let pool = self.pools.entry(name.to_string()).or_default();
for _ in 0..size {
(0..size).for_each(|_| {
let id = self.proc_id;
self.proc_id += 1;
pool.push((id, Box::new(P::default())));
}
let handle = Rc::new(RefCell::new(P::default()));
self.add_to_pool::<P>(name, id, handle.clone());
self.add_to_pool::<P>(GLOBAL_POOL, id, handle.clone());
});

self
}

fn add_to_pool<P: ProcessHandle + Default + 'static>(
&mut self,
name: &str,
id: usize,
handle: MutableProcessHandle,
) {
let pool = self.pools.entry(name.to_string()).or_default();
pool.push((id, handle));
}

/// Sets the random seed for deterministic simulation execution.
///
/// The seed controls all random behavior in the simulation, including network
Expand Down Expand Up @@ -430,7 +443,7 @@ impl SimulationBuilder {
let mut ids = Vec::new();
for (id, handle) in pool {
ids.push(id);
procs.insert(id, RefCell::new(handle));
procs.insert(id, handle);
}
pool_listing.insert(name, ids);
}
Expand Down
4 changes: 4 additions & 0 deletions dscale/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use crate::{ProcessId, random::Distributions};
pub(crate) type LatencyTopology = HashMap<(ProcessId, ProcessId), Distributions>;
pub(crate) type PoolListing = HashMap<String, Vec<ProcessId>>;

/// Default pool for all processes within simulation.
/// Broadcasts by default use this pool.
pub const GLOBAL_POOL: &str = "global_pool";

/// Describes network latency characteristics for different process relationships.
///
/// `LatencyDescription` allows you to configure different latency patterns
Expand Down