diff --git a/examples/mmc_queue/README.md b/examples/mmc_queue/README.md new file mode 100644 index 00000000..4587ad3f --- /dev/null +++ b/examples/mmc_queue/README.md @@ -0,0 +1,53 @@ +# M/M/c Queue +A classic [M/M/c queue](https://en.wikipedia.org/wiki/M/M/c_queue) implemented as a pure discrete event simulation in Mesa. + +## Summary +Customers arrive according to a Poisson process (rate λ) and join a single FIFO queue. *c* servers each draw from the queue independently, with exponentially distributed service times (rate μ per server). This is the standard multi-server queuing model from operations research, with well-known analytical solutions for validation. + +### Mesa DES features demonstrated + +| Feature | Usage | +|---|---| +| `schedule_recurring` | Stochastic customer arrivals (exponential inter-arrival times) | +| `schedule_event(after=...)` | Scheduling service completions | +| `run_until` | Running a pure event-driven simulation to a target time | +| Dynamic agent lifecycle | Customers created on arrival, `remove()`d after service | + +The model disables Mesa's default step schedule (`_default_schedule.stop()`) and is driven entirely by events. + +### Design: server-centric +Servers are active agents. When a server completes service, it checks the queue and pulls the next customer itself — a natural ABM pattern, in contrast to the system-centric routing common in traditional DES. + +## How to run +```bash +python model.py +``` + +Runs the simulation for 10,000 time units and prints simulated vs. analytical steady-state metrics. + +## Files + +| File | Description | +|---|---| +| `agents.py` | `Customer` and `Server` agents | +| `model.py` | `MMcQueue` model | +| `analytical_mmc.py` | Erlang C closed-form solutions for validation | + +## Analytical validation +For a stable M/M/c system (traffic intensity $ρ = λ/(cμ) < 1$), closed-form results exist via the Erlang C formula. The model includes `analytical_mmc()` to compute these, so simulation output can be compared directly: + +``` +M/M/3 Queue (λ=2.0, μ=1.0, T=10000.0) +Customers served: 19992 + +Metric Simulated Analytical +--------------------------------------------------- +Server utilization 0.6672 0.6667 +Avg wait time 0.3716 0.3750 +Avg system time 1.3716 1.3750 +``` + +Results converge to analytical values as simulation time increases. + +## Visualisation +There's no visualization yet, but an `app.py` implementation would be appreciated! diff --git a/examples/mmc_queue/agents.py b/examples/mmc_queue/agents.py new file mode 100644 index 00000000..f2e8316d --- /dev/null +++ b/examples/mmc_queue/agents.py @@ -0,0 +1,74 @@ +"""Customer and Server agents for the M/M/c queue model.""" + +from mesa import Agent + + +class Customer(Agent): + """A customer in the queuing system. + + Created on arrival, removed after service. Tracks timestamps + for wait time and system time calculations. + """ + + def __init__(self, model): + super().__init__(model) + self.arrival_time = model.time + self.service_start_time = None + self.service_end_time = None + + @property + def wait_time(self): + """Time spent waiting in queue before service began.""" + if self.service_start_time is None: + return None + return self.service_start_time - self.arrival_time + + @property + def system_time(self): + """Total time in the system (wait + service).""" + if self.service_end_time is None: + return None + return self.service_end_time - self.arrival_time + + +class Server(Agent): + """A server that pulls customers from the queue when idle. + + Server-centric design: after completing service, the server + checks the queue and pulls the next customer itself. + """ + + def __init__(self, model, service_rate): + super().__init__(model) + self.service_rate = service_rate + self.current_customer = None + self.busy_time = 0.0 + self._service_started_at = None + + @property + def is_idle(self): + return self.current_customer is None + + def start_service(self, customer): + """Begin serving a customer.""" + customer.service_start_time = self.model.time + self.current_customer = customer + self._service_started_at = self.model.time + + duration = self.model.rng.exponential(1.0 / self.service_rate) + self.model.schedule_event(self._complete_service, after=duration) + + def _complete_service(self): + """Complete service and try to pull next customer from queue.""" + customer = self.current_customer + customer.service_end_time = self.model.time + self.busy_time += self.model.time - self._service_started_at + + self.model._record_departure(customer) + customer.remove() + + # Server-centric: actively pull from queue + self.current_customer = None + self._service_started_at = None + if self.model.queue: + self.start_service(self.model.queue.popleft()) diff --git a/examples/mmc_queue/analytical_mmc.py b/examples/mmc_queue/analytical_mmc.py new file mode 100644 index 00000000..c1149a50 --- /dev/null +++ b/examples/mmc_queue/analytical_mmc.py @@ -0,0 +1,29 @@ +from math import factorial + + +def analytical_mmc(arrival_rate, service_rate, c): + """Compute analytical M/M/c steady-state metrics using the Erlang C formula. + + Returns: + dict with analytical metrics, or None if system is unstable (rho >= 1). + """ + rho = arrival_rate / (c * service_rate) + if rho >= 1.0: + return None + + a = arrival_rate / service_rate # offered load + + sum_terms = sum(a**k / factorial(k) for k in range(c)) + last_term = (a**c / factorial(c)) * (1 / (1 - rho)) + p0 = 1.0 / (sum_terms + last_term) + + erlang_c = (a**c / factorial(c)) * (1 / (1 - rho)) * p0 + + return { + "utilization": rho, + "avg_wait_time": erlang_c / (c * service_rate * (1 - rho)), + "avg_system_time": erlang_c / (c * service_rate * (1 - rho)) + + 1.0 / service_rate, + "avg_queue_length": erlang_c * rho / (1 - rho), + "prob_queuing": erlang_c, + } diff --git a/examples/mmc_queue/model.py b/examples/mmc_queue/model.py new file mode 100644 index 00000000..07e9af5c --- /dev/null +++ b/examples/mmc_queue/model.py @@ -0,0 +1,132 @@ +"""M/M/c queuing model: a pure discrete event simulation in Mesa. + +Demonstrates schedule_recurring (arrivals), schedule_event (service), +and run_until — no step() needed. +""" + +from collections import deque + +from mesa import Model +from mesa.time import Schedule + +try: + from .agents import Customer, Server +except ImportError: + from agents import Customer, Server + + +class MMcQueue(Model): + """M/M/c queuing system. + + Args: + arrival_rate: Mean arrival rate (λ). Customers per time unit. + service_rate: Mean service rate per server (μ). + n_servers: Number of servers (c). + rng: Random number generator seed. + """ + + def __init__(self, arrival_rate=1.0, service_rate=0.5, n_servers=2, **kwargs): + super().__init__(**kwargs) + self.arrival_rate = arrival_rate + self.service_rate = service_rate + self.n_servers = n_servers + + # Queue + self.queue = deque() + + # Metrics + self.customers_served = 0 + self.total_wait_time = 0.0 + self.total_system_time = 0.0 + + # Create servers + self.servers = [Server(self, service_rate) for _ in range(n_servers)] + + # Disable default step schedule — pure DES + self._default_schedule.stop() + + # Schedule stochastic arrivals + self.schedule_recurring( + self._customer_arrival, + Schedule( + interval=lambda m: m.rng.exponential(1.0 / m.arrival_rate), + start=0.0, + ), + ) + + def _customer_arrival(self): + """Handle a customer arrival.""" + customer = Customer(self) + + for server in self.servers: + if server.is_idle: + server.start_service(customer) + return + + self.queue.append(customer) + + def _record_departure(self, customer): + """Record metrics for a departing customer.""" + self.customers_served += 1 + self.total_wait_time += customer.wait_time + self.total_system_time += customer.system_time + + # --- Metrics --- + + @property + def avg_wait_time(self): + if self.customers_served == 0: + return 0.0 + return self.total_wait_time / self.customers_served + + @property + def avg_system_time(self): + if self.customers_served == 0: + return 0.0 + return self.total_system_time / self.customers_served + + @property + def server_utilization(self): + if self.time == 0: + return 0.0 + return sum(s.busy_time for s in self.servers) / (self.n_servers * self.time) + + @property + def current_queue_length(self): + return len(self.queue) + + +if __name__ == "__main__": + try: + from .analytical_mmc import analytical_mmc + except ImportError: + from analytical_mmc import analytical_mmc + + ARRIVAL_RATE = 2.0 + SERVICE_RATE = 1.0 + N_SERVERS = 3 + SIM_TIME = 10_000.0 + + model = MMcQueue( + arrival_rate=ARRIVAL_RATE, + service_rate=SERVICE_RATE, + n_servers=N_SERVERS, + rng=42, + ) + model.run_until(SIM_TIME) + + analytical = analytical_mmc(ARRIVAL_RATE, SERVICE_RATE, N_SERVERS) + + print(f"M/M/{N_SERVERS} Queue (λ={ARRIVAL_RATE}, μ={SERVICE_RATE}, T={SIM_TIME})") + print(f"Customers served: {model.customers_served}\n") + print(f"{'Metric':<25} {'Simulated':>12} {'Analytical':>12}") + print("-" * 51) + print( + f"{'Server utilization':<25} {model.server_utilization:>12.4f} {analytical['utilization']:>12.4f}" + ) + print( + f"{'Avg wait time':<25} {model.avg_wait_time:>12.4f} {analytical['avg_wait_time']:>12.4f}" + ) + print( + f"{'Avg system time':<25} {model.avg_system_time:>12.4f} {analytical['avg_system_time']:>12.4f}" + )