diff --git a/examples/assets/chrome-capture-2026-03-09_TrafficModel_Mesa.gif b/examples/assets/chrome-capture-2026-03-09_TrafficModel_Mesa.gif new file mode 100644 index 000000000..14bcdea0a Binary files /dev/null and b/examples/assets/chrome-capture-2026-03-09_TrafficModel_Mesa.gif differ diff --git a/examples/smart_traffic_lights/README.md b/examples/smart_traffic_lights/README.md new file mode 100644 index 000000000..6fe306872 --- /dev/null +++ b/examples/smart_traffic_lights/README.md @@ -0,0 +1,26 @@ +# Smart Traffic Lights + +## Summary + +An optimization simulation where traffic light agents use local information to minimize vehicle wait times at intersections. + +![Simulation of Smart Traffic Controller](./assets/chrome-capture-2026-03-09_TrafficModel_Mesa.gif) + +## Installation + +To install the dependencies use pip and the requirements.txt in this directory. e.g. + +``` + $ pip install -r requirements.txt +``` + + + +Then open your browser to [http://127.0.0.1:8521/](http://127.0.0.1:8521/), select the model parameters, press Reset, then Start. + +## Files + +* ``smart_traffic_light/agents.py``: Defines the CarAgent, the TrafficLightAgent and the IntersectionController classes. +* ``smart_traffic_light/model.py``: Defines the Traffic model and the DataCollector functions. +* ``run_example.py``: Script to compare waiting time in traffic using smart and normal traffic light controller. +* ``app.py``: Visualization script on Solara. diff --git a/examples/smart_traffic_lights/app.py b/examples/smart_traffic_lights/app.py new file mode 100644 index 000000000..6172d9fb3 --- /dev/null +++ b/examples/smart_traffic_lights/app.py @@ -0,0 +1,80 @@ +from typing import Any + +import mesa +from mesa.visualization import SolaraViz, make_plot_component, make_space_component +from smart_traffic_lights.agents import ( + CarAgent, + Direction, + LightState, + TrafficLightAgent, +) +from smart_traffic_lights.model import TrafficModel + + +def traffic_portrayal(agent: mesa.Agent) -> dict[str, Any]: + """ + Determines how agents are drawn on the grid. + + - Cars: Blue for East, Purple for North. + - Lights: Circle markers, Red or Green based on state. + - Controller: Hidden (has no position). + """ + + if isinstance(agent, TrafficLightAgent): + return { + "color": "tab:green" if agent.state == LightState.GREEN else "tab:red", + "marker": "o", # Circle for lights + "size": 100, + "zorder": 1, # Ensure lights are drawn above cars + "alpha": 1.0, + } + if isinstance(agent, CarAgent): + return { + "color": "tab:blue" if agent.direction == Direction.EAST else "tab:purple", + "marker": "s", # Square for cars + "zorder": 0, # Ensure lights are drawn above cars + "size": 40, + } + return {} + + +# Define interactive parameters for Solara UI +model_params = { + "width": mesa.visualization.Slider( + label="Width of the grid", value=20, min=5, max=40, step=1 + ), + "height": mesa.visualization.Slider( + label="Height of the grid", value=20, min=5, max=40, step=1 + ), + "num_cars_east": mesa.visualization.Slider( + label="Number of cars going east", value=8, min=1, max=20, step=1 + ), + "num_cars_north": mesa.visualization.Slider( + label="Number of cars going north", value=8, min=1, max=20, step=1 + ), + "smart_lights": mesa.visualization.Slider( + label="Smart Lights (0=Off, 1=On)", value=1, min=0, max=1, step=1 + ), +} + +# Create the Grid View +space_component = make_space_component(traffic_portrayal) + +# Create the Wait Time Chart +wait_time_chart = make_plot_component({"Total_Red_Light_Wait_Time": "tab:red"}) + +initial_model = TrafficModel() + +# Instantiate the Solara Visualization Page +app = SolaraViz( + model=initial_model, + model_params=model_params, + components=[ + space_component, + wait_time_chart, + ], + name="Smart Traffic Simulation", +) + + +# app diff --git a/examples/smart_traffic_lights/assets/chrome-capture-2026-03-09_TrafficModel_Mesa.gif b/examples/smart_traffic_lights/assets/chrome-capture-2026-03-09_TrafficModel_Mesa.gif new file mode 100644 index 000000000..14bcdea0a Binary files /dev/null and b/examples/smart_traffic_lights/assets/chrome-capture-2026-03-09_TrafficModel_Mesa.gif differ diff --git a/examples/smart_traffic_lights/requirements.txt b/examples/smart_traffic_lights/requirements.txt new file mode 100644 index 000000000..e3c431f16 --- /dev/null +++ b/examples/smart_traffic_lights/requirements.txt @@ -0,0 +1,7 @@ +mesa[viz]>=3.0 +networkx +numpy +pandas +enum +typing +matplotlib diff --git a/examples/smart_traffic_lights/run_example.py b/examples/smart_traffic_lights/run_example.py new file mode 100644 index 000000000..e73152396 --- /dev/null +++ b/examples/smart_traffic_lights/run_example.py @@ -0,0 +1,54 @@ +import numpy as np +from smart_traffic_lights.model import TrafficModel + +STEPS = 2000 + +width = 20 +height = 20 +num_cars_east = 8 +num_cars_north = 8 + + +print("Running Static Traffic Lights Simulation...") +model_static = TrafficModel(smart_lights=False) +for _ in range(STEPS): + model_static.step() + +print("Running Smart Traffic Lights Simulation...") +model_smart = TrafficModel(smart_lights=True) +for _ in range(STEPS): + model_smart.step() + +# Retrieve data +static_df = model_static.datacollector.get_model_vars_dataframe() +smart_df = model_smart.datacollector.get_model_vars_dataframe() + +# Calculate the percentage improvement +final_wait_static = static_df["Total_Red_Light_Wait_Time"].iloc[-1] +final_wait_smart = smart_df["Total_Red_Light_Wait_Time"].iloc[-1] + +improvement = np.round( + (final_wait_static - final_wait_smart) / final_wait_static * 100, 2 +) + +print("-" * 40) +print(f"Results after {STEPS} steps:") +print(f"Total Red Light Wait Time (Static Lights): {final_wait_static} steps") +print(f"Total Red Light Wait Time (Smart Lights) : {final_wait_smart} steps") +print( + f"Performance Improvement : {improvement}% reduction in red light wait time" +) +print("-" * 40) + +final_wait_static = static_df["Total_Wait_Time"].iloc[-1] +final_wait_smart = smart_df["Total_Wait_Time"].iloc[-1] + +improvement = np.round( + (final_wait_static - final_wait_smart) / final_wait_static * 100, 2 +) + +print("-" * 40) +print(f"Total Wait Time (Static Lights): {final_wait_static} steps") +print(f"Total Wait Time (Smart Lights) : {final_wait_smart} steps") +print(f"Performance Improvement : {improvement}% reduction in total wait time") +print("-" * 40) diff --git a/examples/smart_traffic_lights/smart_traffic_lights/agents.py b/examples/smart_traffic_lights/smart_traffic_lights/agents.py new file mode 100644 index 000000000..7b8c2a3cf --- /dev/null +++ b/examples/smart_traffic_lights/smart_traffic_lights/agents.py @@ -0,0 +1,160 @@ +import enum + +import mesa + + +class Direction(enum.Enum): + EAST = (1, 0) + NORTH = (0, 1) + + +class LightState(enum.Enum): + RED = 0 + GREEN = 1 + + +class TrafficLightAgent(mesa.Agent): + """ + An agent representing a single traffic light. + + Attributes: + state (LightState): The current state of the light (RED or GREEN). + direction (Direction): The flow of traffic this light controls. + """ + + def __init__(self, model: mesa.Model, state: LightState, direction: Direction): + super().__init__(model) + self.state = state + self.direction = direction + + def step(self): + # Traffic lights are passive; the Controller changes their state. + pass + + +class CarAgent(mesa.Agent): + """ + An agent representing a car in the grid. + + Attributes: + direction (Direction): The direction the car is traveling. + wait_time (int): Accumulator for time steps spent not moving. + """ + + def __init__(self, model: mesa.Model, direction: Direction): + super().__init__(model) + self.direction = direction + self.total_wait_time = 0 + self.red_light_wait_time = 0 + + def step(self): + """ + Determines if the car can move forward based on obstacles and lights. + """ + + # Calculate the next coordinate based on direction + next_x = (self.pos[0] + self.direction.value[0]) % self.model.grid.width + next_y = (self.pos[1] + self.direction.value[1]) % self.model.grid.height + next_pos = (next_x, next_y) + + can_move = True + stopped_by_red_light = False + cell_contents = self.model.grid.get_cell_list_contents([next_pos]) + + for obj in cell_contents: + if isinstance(obj, CarAgent): + can_move = False + break + elif isinstance(obj, TrafficLightAgent): + # Only stop if the light controls our direction and is red + if obj.direction == self.direction and obj.state == LightState.RED: + can_move = False + stopped_by_red_light = True + break + + if can_move: + self.model.grid.move_agent(self, next_pos) + else: + self.total_wait_time += 1 + if stopped_by_red_light: + self.red_light_wait_time += 1 + + +class IntersectionController(mesa.Agent): + """ + A meta-agent that controls the traffic lights at an intersection. + + Attributes: + smart (bool): If True, uses queue density to toggle. If False, uses fixed timer. + lights (List[TrafficLightAgent]): The lights managed by this controller. + """ + + def __init__( + self, + model: mesa.Model, + smart: bool, + lights: list[TrafficLightAgent], + sensor_range: int = 5, + static_wait=15, + ): + super().__init__(model) + self.smart = smart + self.static_wait = static_wait + self.lights = {light.direction: light for light in lights} # Dictionary + self.sensor_range = sensor_range + self.timer = 0 + self.cooldown = 2 # Minimum steps before a light can change again + + def get_queue_density(self, light: TrafficLightAgent) -> int: + """ + Calculates the number of cars waiting in the sensor zone approaching the light. + """ + count = 0 + # Look backwards from the light based on the direction it controls + dx, dy = light.direction.value + for i in range(1, self.sensor_range + 1): + check_x = (light.pos[0] - dx * i) % self.model.grid.width + check_y = (light.pos[1] - dy * i) % self.model.grid.height + + check_pos = (check_x, check_y) + if any( + isinstance(a, CarAgent) + for a in self.model.grid.get_cell_list_contents([check_pos]) + ): + count += 1 + return count + + def toggle_lights(self): + """ + Switches all lights managed by the controller. + """ + for light in self.lights.values(): + light.state = ( + LightState.GREEN if light.state == LightState.RED else LightState.RED + ) + self.timer = 0 + + def step(self): + self.timer += 1 + + if not self.smart: + # Static: Toggle every fixed interval + if self.timer >= self.static_wait: + self.toggle_lights() + else: + # Smart: Toggle based on dynamic queue density + if self.timer >= self.cooldown: + # Select lights by direction to find queue lengths + east_light = self.lights[Direction.EAST] + north_light = self.lights[Direction.NORTH] + + east_queue = self.get_queue_density(east_light) + north_queue = self.get_queue_density(north_light) + + # If the current green light has a smaller queue than the red light, toggle + if ( + east_light.state == LightState.GREEN and north_queue > east_queue + ) or ( + north_light.state == LightState.GREEN and east_queue > north_queue + ): + self.toggle_lights() diff --git a/examples/smart_traffic_lights/smart_traffic_lights/model.py b/examples/smart_traffic_lights/smart_traffic_lights/model.py new file mode 100644 index 000000000..f2f75821e --- /dev/null +++ b/examples/smart_traffic_lights/smart_traffic_lights/model.py @@ -0,0 +1,100 @@ +import mesa + +from .agents import ( + CarAgent, + Direction, + IntersectionController, + LightState, + TrafficLightAgent, +) + + +def track_total_wait_time(model: mesa.Model) -> int: + """ + Helper function for the DataCollector to compute total wait time. + """ + return sum(a.total_wait_time for a in model.agents if isinstance(a, CarAgent)) + + +def track_red_light_wait_time(model: mesa.Model) -> int: + """ + Helper function for the DataCollector to compute total wait time. + """ + return sum(a.red_light_wait_time for a in model.agents if isinstance(a, CarAgent)) + + +class TrafficModel(mesa.Model): + """ + The simulation model for the traffic network. + + Attributes: + grid (mesa.space.MultiGrid): The spatial grid. + """ + + def __init__( + self, + width: int = 20, + height: int = 20, + num_cars_east: int = 8, + num_cars_north: int = 8, + smart_lights: bool = False, + ): + super().__init__() + self.grid = mesa.space.MultiGrid(width, height, torus=True) + + # Setup intersection (center of the grid) + center_x, center_y = width // 2, height // 2 + + # Create Traffic Lights + light_east = TrafficLightAgent(self, LightState.GREEN, Direction.EAST) + self.grid.place_agent(light_east, (center_x - 1, center_y)) + + light_north = TrafficLightAgent(self, LightState.RED, Direction.NORTH) + self.grid.place_agent(light_north, (center_x, center_y - 1)) + + # Create Meta-Agent Controller + IntersectionController( + self, smart=smart_lights, lights=[light_east, light_north], sensor_range=6 + ) + + # Spawn Cars + + # Spawn East-bound cars on the horizontal road + for _ in range(num_cars_east): + car = CarAgent(self, Direction.EAST) + self.grid.place_agent(car, (self.random.randrange(width), center_y)) + + # Spawn North-bound cars on the vertical road + for _ in range(num_cars_north): + car = CarAgent(self, Direction.NORTH) + self.grid.place_agent(car, (center_x, self.random.randrange(height))) + + # Setup Data Collection + self.datacollector = mesa.DataCollector( + model_reporters={ + "Total_Wait_Time": track_total_wait_time, + "Total_Red_Light_Wait_Time": track_red_light_wait_time, + } + ) + + def step(self): + # Get cars by direction + all_cars = self.agents_by_type[CarAgent] + east_cars = all_cars.select(lambda a: a.direction == Direction.EAST) + north_cars = all_cars.select(lambda a: a.direction == Direction.NORTH) + + # Sort front-to-back: Cars with higher x (for East) or higher y (for North) + # are "further ahead" and should move first to clear the path. + sorted_east = sorted(east_cars, key=lambda a: a.pos[0], reverse=True) + sorted_north = sorted(north_cars, key=lambda a: a.pos[1], reverse=True) + + # Execute movement in the specific order + for car in sorted_east: + car.step() + for car in sorted_north: + car.step() + + # Controller steps last to perceive the final positions of the cars + self.agents_by_type[IntersectionController].do("step") + + self.datacollector.collect(self)