End-to-end reference stack for mission intake, prioritization, and assignment across a simulated AMR fleet: a FastAPI control plane with SQLite persistence, ROS 2 fleet manager and simulator nodes, Eclipse Mosquitto for pub/sub, and a FastAPI + D3 operator dashboard.
Current required checks for protected branches:
- API Unit Tests (Python 3.10)
- API Unit Tests (Python 3.11)
- Dashboard Unit Tests (Python 3.11)
- MQTT Config Tests
- ROS2 Package Tests (Python)
- ROS2 Runtime Tests (Humble)
- Docker Compose Build Validation
- Integration PR Smoke (Health + Failure Paths)
The repository is organized into five main components:
- Runs FastAPI mission ingestion and mission lifecycle state management
- Supports:
- POST /missions
- GET /missions
- PATCH /missions/{mission_id}
- Stores mission records in SQLite
- Publishes mission-created and mission-updated events via MQTT
- Implemented: REST endpoints, transition validation, SQLite repository, MQTT publish hooks, structured logging with request correlation, lifespan-based startup/shutdown, protected MQTT metrics route (/mqtt/metrics), mission event history endpoint, analytics summary endpoint, extensible mission types (optionally allowlisted), extensible mission constraints (maxDistance + customConstraints)
- Implemented: first-class robot registration endpoints (register/heartbeat/active list)
- Planned / future: Authentication/authorization, persistent production DB migration tooling, comprehensive source attribution for non-fleet-manager update paths
- Subscribes to RobotStatus updates
- Polls REST API every second for pending missions
- Subscribes to MQTT mission-created events (dual trigger)
- Ranks robots and dispatches missions via shared ROS2 action endpoint
- Uses in-flight dedupe and dead-letter capture for patch failures
- Implemented: Polling + retry, MQTT listener, ranking, action dispatch, in-flight idempotency, status patching, bounded dead-letter replay retries, health/metrics publishing topic, advanced assignment constraints (capability/zone/allowed robot IDs/deadline/min battery/maxDistance), fairness weighting, custom constraint rules (robot-field operators), custom ranking formula support (global env and per-mission override), stale robot liveness detection with automatic mission requeue (ASSIGNED/IN_PROGRESS -> PENDING)
- Implemented: optional registration-enforced assignment mode via REQUIRE_ROBOT_REGISTRATION
- Planned / future: Durable dead-letter persistence across process restarts
- Simulates 3+ robots at 10 Hz
- Publishes RobotStatus telemetry
- Hosts shared AssignMission action server
- Simulates mission execution with progress feedback and result
- Mirrors robot telemetry to MQTT for dashboard consumption
- Implemented: Robot simulation loop, battery model, movement model, action routing by robot_id, position/time-based completion criteria, elapsed time-stepping in action execution, state-aware mission abort logic (ERROR/non-expected state), deterministic timing controls, optional failure injection flags, MQTT telemetry bridge, dynamic robot registration controls
- Planned / future: More realistic motion constraints and map-aware navigation
- Central pub/sub transport for mission and robot events
- Topics:
- fleet/missions/created
- fleet/missions/updated
- fleet/robots/status
- Implemented: Broker integration, topic wiring, reconnect handling in clients, strict config validation script and CI enforcement, expanded config tests for option combinations
- Planned / future: ACL policies, retained message strategy and topic governance
- Browser dashboard for mission creation, mission timeline, and operational charts
- Proxies to API and subscribes to robot telemetry through backend MQTT subscriber
- Includes 12 D3 visualizations for map, throughput, latency, utilization, ranking insights, and more
- Implemented: Mission create/list UI, typed response contracts, centralized upstream error categorization, subscriber callbacks/health counters, chart rendering, telemetry-backed chart bindings, realtime WebSocket updates, mission history drill-down panel, analytics summary integration, mission timeline filter/sort controls, advanced mission composer fields for custom constraints and ranking formula override, chart legends and axis labels for readability
- Planned / future: Authentication, fleet-manager dead-letter feed integration
Canonical architecture source is at root in systemarchitecture.mmd, which includes the latest reliability and observability additions (protected diagnostics route, mission history and analytics summary endpoints, dashboard WebSocket live stream, dead-letter replay, FM health/source counters, simulator dynamic registration and determinism/failure injection).
%% Implemented vs planned details are described in the sections above.
graph TD
subgraph Client_and_UI ["Client and dashboard host"]
U[Operator / browser]
D[Dashboard: mission UI, D3 charts, API proxy, MQTT ingest, WebSocket stream, mission history — operator auth planned]
end
subgraph Control_Plane ["Control plane host"]
API[REST API: FastAPI, mission lifecycle, SQLite, MQTT events, history and analytics — production DB and auth planned]
DB[("SQLite")]
MQ[Mosquitto: mission and robot topics — TLS, broker auth, ACLs planned]
end
subgraph Edge_Robotics ["Robot / ROS 2 host"]
FM[Fleet manager: REST poll, MQTT trigger, ranking, dispatch, PATCH lifecycle, health metrics — durable dead letters planned]
RS[Robot simulator: 10 Hz telemetry, shared AssignMission server, execution feedback — richer physics planned]
end
U -->|HTTP| D
D -->|REST| API
API --> DB
API -->|publish| MQ
RS -->|telemetry| MQ
MQ --> D
FM -->|poll / PATCH| API
D -->|WebSocket| U
MQ --> FM
FM -->|AssignMission| RS
D -->|history / analytics| API
- API Service:
- Mission POST/GET/PATCH
- Transition rule validation
- SQLite persistence
- MQTT mission event publishing
- Lifespan-based startup/shutdown resource management
- Request ID propagation middleware
- Protected MQTT diagnostics metrics route
- Mission history timeline endpoint (/missions/{id}/history)
- Analytics summary endpoint (/missions/analytics/summary)
- Robot registration endpoints (/robots/register, /robots/{id}/heartbeat, /robots/active)
- Extensible mission types with optional allowlist (MISSION_TYPE_ALLOWLIST)
- Extensible mission constraints (maxDistance + customConstraints)
- Fleet Manager:
- 1-second mission polling with retry
- MQTT mission trigger subscription
- Ranking engine with eligibility filters
- In-flight dedupe for dual-trigger safety
- Action dispatch and status lifecycle patching
- Stale/inactive robot detection for in-flight missions with automatic requeue for reassignment
- Dead-letter replay with bounded retries
- Runtime health metrics publisher (/fleet_manager/health)
- Trigger-source counters in health payload (REST_POLL, MQTT_EVENT, MANUAL)
- Active-robot registry sync from API and optional registration-only dispatch mode
- Constraint-aware assignment filters (capability/zone/allowed robot IDs/deadlines/min battery)
- Fairness-aware ranking weight
- Custom constraint rule filtering via CUSTOM_CONSTRAINT_RULES_JSON
- Custom ranking formula support via RANKING_FORMULA and per-mission customConstraints.rankingFormula
- Robot Simulator:
- Multi-robot 10 Hz telemetry publication
- Shared action endpoint with robot_id routing
- Mission progress feedback and result handling based on target reach
- Battery/movement simulation baseline
- Determinism controls (seed and timing env vars)
- Optional failure injection flags
- Dynamic robot registration controls (env-driven)
- Elapsed time-stepping and state-aware action loop behavior to reduce false timeout failures
- Dashboard:
- Mission creation and monitoring UI
- Typed API response models
- Centralized upstream error handling (timeout/connection/upstream-status)
- MQTT subscriber connect/disconnect/message metrics
- D3 visualization suite
- WebSocket live stream endpoint and frontend consumer
- Mission history drill-down panel and analytics summary ribbon
- Mission timeline filters and sort controls
- Advanced mission form fields for custom constraints and per-mission ranking formula override
- Visualization axis titles and legends
- API Service:
- Authentication/authorization
- Expanded analytics dimensions and drill-down aggregations
- Production-grade persistent DB strategy
- Fleet Manager:
- Durable dead-letter queue (disk-backed)
- Robot Simulator:
- Realistic kinematics and map constraints
- Dashboard:
- User auth and role controls
- Richer historical analytics exploration UX (custom ranges/filters)
- Two machines minimum (recommended three):
- Management laptop (optional) for API calls and browser
- Control Plane host for API + Dashboard + Mosquitto
- Edge Robotics host for Fleet Manager + Robot Simulator
- Docker Desktop or Docker Engine with Compose support on each host
- All hosts must be network reachable
Default exposed ports in docker-compose are:
- API: 8000
- Dashboard: 8080
- MQTT: 1883
These are default mappings, not strict requirements. You can change host-side ports in docker-compose.yml, for example:
- "9000:8000" to expose API on host port 9000
- "9090:8080" to expose Dashboard on host port 9090
- "1884:1883" to expose MQTT on host port 1884
If you change ports, also update client URLs and environment variables accordingly (for example MISSION_API_BASE_URL, DASHBOARD_API_BASE_URL, MQTT_BROKER_PORT).
Newly supported configuration (see root .env.example):
- API
- API_DIAGNOSTICS_TOKEN: protects GET /mqtt/metrics via X-Diagnostics-Token header
- ROBOT_REGISTRATION_DEFAULT_TTL_SEC
- ROBOT_REGISTRATION_MAX_TTL_SEC
- LOG_LEVEL, LOG_FORMAT, LOG_FORCE: structured logging controls
- MQTT publisher hardening (API)
- MQTT_PUBLISH_ACK_TIMEOUT_SEC
- MQTT_PUBLISH_MAX_RETRIES
- MQTT_PUBLISH_RETRY_BACKOFF_SEC
- MQTT_OFFLINE_QUEUE_MAXSIZE
- MQTT_TLS_ENABLED, MQTT_TLS_CA_CERTS, MQTT_TLS_CERTFILE, MQTT_TLS_KEYFILE, MQTT_TLS_INSECURE
- Fleet Manager
- DEAD_LETTER_RETRY_MAX_ATTEMPTS
- DEAD_LETTER_RETRY_INTERVAL_SEC
- FLEET_MANAGER_HEALTH_TOPIC
- REQUIRE_ROBOT_REGISTRATION
- ROBOT_TELEMETRY_STALE_SEC
- ROBOT_TELEMETRY_HEARTBEAT_INTERVAL_SEC
- ROBOT_TELEMETRY_MISSED_HEARTBEATS
- WEIGHT_FAIRNESS
- RANKING_FORMULA
- CUSTOM_CONSTRAINT_RULES_JSON
- ROBOT_METADATA_JSON
- Mission model
- MISSION_TYPE_ALLOWLIST
- Robot Simulator
- SIM_TICK_SEC
- SIM_ACTION_STEP_SEC
- SIM_ACTION_TIMEOUT_SEC
- SIM_SLEEP_SCALE
- SIM_RANDOM_SEED
- SIM_FAIL_PROBABILITY
- SIM_FORCE_FAIL_MISSION_IDS
- SIM_ROBOTS_JSON
- SIM_DYNAMIC_REGISTRATION_ENABLED
- SIM_DYNAMIC_REGISTRATION_INTERVAL_SEC
- SIM_MAX_ROBOTS
Pick exactly one path:
- Path A (recommended for demos): Single-machine run with all services on one host.
- Path B (production-like split): Multi-machine deployment with separate control-plane and edge hosts.
Do not run both paths on the same host at the same time unless you intentionally isolate projects and ports.
Use this when all services run on one machine.
- From repository root, start everything:
docker compose up --build -ddocker compose up --build -d- Verify all containers are healthy:
docker compose psdocker compose ps- Validate API and dashboard:
Invoke-RestMethod -Method Get -Uri "http://localhost:8000/health"
Invoke-RestMethod -Method Get -Uri "http://localhost:8080/api/health"curl -fsS http://localhost:8000/health
curl -fsS http://localhost:8080/api/health- Open dashboard:
http://localhost:8080
- Create a mission from dashboard or PowerShell:
powershell -ExecutionPolicy Bypass -File scripts/post_sample_mission.ps1curl -fsS -X POST "http://localhost:8000/missions" \
-H "Content-Type: application/json" \
-d '{
"type": "PICKUP",
"priority": 4,
"waypoints": [
{
"id": 1,
"position": {"x": 4.5, "y": 2.0},
"action": "PICKUP",
"completed": false
}
],
"estimatedDuration": 240
}'- Follow logs if needed:
docker compose logs -fdocker compose logs -fExample host: 192.168.0.253
- Clone/copy this repository to the host.
- Ensure Docker is installed:
docker --version
docker compose versiondocker --version
docker compose version- Start control-plane services only:
docker compose up --build -d api dashboard mosquittodocker compose up --build -d api dashboard mosquitto- Validate services:
Invoke-RestMethod -Method Get -Uri "http://192.168.0.253:8000/health"
Invoke-RestMethod -Method Get -Uri "http://192.168.0.253:8080/api/health"curl -fsS http://192.168.0.253:8000/health
curl -fsS http://192.168.0.253:8080/api/health- Open firewall ports on this host:
- 8000 (REST API)
- 8080 (Dashboard)
- 1883 (MQTT)
Example host: 192.168.0.188
- Clone/copy this repository to edge host.
- Set environment values so edge services point to control plane:
$env:MISSION_API_BASE_URL = "http://192.168.0.253:8000"
$env:MQTT_BROKER_HOST = "192.168.0.253"
$env:MQTT_BROKER_PORT = "1883"
$env:MQTT_ENABLED = "true"export MISSION_API_BASE_URL="http://192.168.0.253:8000"
export MQTT_BROKER_HOST="192.168.0.253"
export MQTT_BROKER_PORT="1883"
export MQTT_ENABLED="true"- Start edge services only:
docker compose up --build -d fleet_manager robot_simulatordocker compose up --build -d fleet_manager robot_simulator- Check logs:
docker compose logs -f fleet_manager
docker compose logs -f robot_simulatordocker compose logs -f fleet_manager
docker compose logs -f robot_simulatorIf dashboard runs on control-plane host, use:
http://192.168.0.253:8080
If you run dashboard elsewhere, set:
$env:DASHBOARD_API_BASE_URL = "http://192.168.0.253:8000"export DASHBOARD_API_BASE_URL="http://192.168.0.253:8000"Then run dashboard service on that host.
docker compose up --build -d dashboarddocker compose up --build -d dashboardSet API base URL once for command examples:
$API_BASE = "http://localhost:8000" # Path A single-machine
# $API_BASE = "http://192.168.0.253:8000" # Path B control-plane hostAPI_BASE="http://localhost:8000" # Path A single-machine
# API_BASE="http://192.168.0.253:8000" # Path B control-plane hostCreate mission via PowerShell:
$body = @{
type = "PICKUP"
priority = 4
waypoints = @(
@{
id = 1
position = @{ x = 4.5; y = 2.0 }
action = "PICKUP"
completed = $false
}
)
estimatedDuration = 240
} | ConvertTo-Json -Depth 6
Invoke-RestMethod -Method Post -Uri "$API_BASE/missions" -Body $body -ContentType "application/json"Create mission with advanced constraints and per-mission formula override via PowerShell:
$advanced = @{
type = "INSPECTION"
priority = 5
waypoints = @(
@{
id = 1
position = @{ x = 8.0; y = 6.0 }
action = "INSPECTION"
completed = $false
}
)
estimatedDuration = 420
constraints = @{
requiredCapability = "GENERAL"
requiredZone = "A"
allowedRobotIds = @("AMR-001", "AMR-003")
minBattery = 35
maxDistance = 12.5
customConstraints = @{
requiredVendor = "ACME"
rankingFormula = "(battery_score*0.5) + (distance_score*0.35) + (fairness_score*0.15)"
}
}
} | ConvertTo-Json -Depth 8
Invoke-RestMethod -Method Post -Uri "$API_BASE/missions" -Body $advanced -ContentType "application/json"Create mission via Bash:
curl -fsS -X POST "${API_BASE}/missions" \
-H "Content-Type: application/json" \
-d '{
"type": "PICKUP",
"priority": 4,
"waypoints": [
{
"id": 1,
"position": {"x": 4.5, "y": 2.0},
"action": "PICKUP",
"completed": false
}
],
"estimatedDuration": 240
}'Create mission with advanced constraints and per-mission formula override via Bash:
curl -fsS -X POST "${API_BASE}/missions" \
-H "Content-Type: application/json" \
-d '{
"type": "INSPECTION",
"priority": 5,
"waypoints": [
{
"id": 1,
"position": {"x": 8.0, "y": 6.0},
"action": "INSPECTION",
"completed": false
}
],
"estimatedDuration": 420,
"constraints": {
"requiredCapability": "GENERAL",
"requiredZone": "A",
"allowedRobotIds": ["AMR-001", "AMR-003"],
"minBattery": 35,
"maxDistance": 12.5,
"customConstraints": {
"requiredVendor": "ACME",
"rankingFormula": "(battery_score*0.5) + (distance_score*0.35) + (fairness_score*0.15)"
}
}
}'List missions:
Invoke-RestMethod -Method Get -Uri "$API_BASE/missions"curl -fsS "${API_BASE}/missions"Patch mission:
$missionId = "<mission-id>"
$patch = @{
status = "ASSIGNED"
assignedRobotId = "AMR-001"
} | ConvertTo-Json
Invoke-RestMethod -Method Patch -Uri "$API_BASE/missions/$missionId" -Body $patch -ContentType "application/json"mission_id="<mission-id>"
curl -fsS -X PATCH "${API_BASE}/missions/${mission_id}" \
-H "Content-Type: application/json" \
-d '{
"status": "ASSIGNED",
"assignedRobotId": "AMR-001"
}'Register a robot with capability/zone metadata and active TTL:
$register = @{
robotId = "AMR-EXT-001"
capability = "HEAVY_LIFT"
zone = "B"
ttlSec = 60
metadata = @{ owner = "vendor-a"; model = "x1200" }
} | ConvertTo-Json -Depth 6
Invoke-RestMethod -Method Post -Uri "$API_BASE/robots/register" -Body $register -ContentType "application/json"curl -fsS -X POST "${API_BASE}/robots/register" \
-H "Content-Type: application/json" \
-d '{
"robotId": "AMR-EXT-001",
"capability": "HEAVY_LIFT",
"zone": "B",
"ttlSec": 60,
"metadata": {"owner": "vendor-a", "model": "x1200"}
}'Refresh heartbeat before ttlSec expiry:
$heartbeat = @{ ttlSec = 60 } | ConvertTo-Json
Invoke-RestMethod -Method Post -Uri "$API_BASE/robots/AMR-EXT-001/heartbeat" -Body $heartbeat -ContentType "application/json"curl -fsS -X POST "${API_BASE}/robots/AMR-EXT-001/heartbeat" \
-H "Content-Type: application/json" \
-d '{"ttlSec": 60}'Inspect currently active registered robots:
Invoke-RestMethod -Method Get -Uri "$API_BASE/robots/active"curl -fsS "${API_BASE}/robots/active"If Fleet Manager is configured with REQUIRE_ROBOT_REGISTRATION=true, only active robots from /robots/active are eligible for dispatch.
- Registration heartbeat uses REST API, not MQTT:
- POST /robots/register
- POST /robots/{id}/heartbeat
- GET /robots/active
- Runtime robot telemetry heartbeat uses MQTT topic fleet/robots/status and is consumed by Dashboard and Fleet Manager.
- Inactive robot checks in Fleet Manager use both signals:
- telemetry staleness threshold (ROBOT_TELEMETRY_STALE_SEC, default 5.0 sec)
- optional missed-heartbeat policy inputs (ROBOT_TELEMETRY_HEARTBEAT_INTERVAL_SEC and ROBOT_TELEMETRY_MISSED_HEARTBEATS)
- registration activity when REQUIRE_ROBOT_REGISTRATION=true
- effective telemetry threshold uses a conservative guard: max(stale_sec, heartbeat_interval * missed_heartbeats)
Mission failover behavior:
- If a mission is ASSIGNED or IN_PROGRESS and its assigned robot becomes stale/inactive, Fleet Manager patches mission back to PENDING, clears assignedRobotId, and immediately makes it eligible for reassignment.
- Requeue counters are published in /fleet_manager/health payload as staleRequeuesAttempted and staleRequeuesSucceeded.
- Health payload also exposes liveness config/threshold details: robotTelemetryStaleSec, robotTelemetryEffectiveStaleSec, robotTelemetryHeartbeatIntervalSec, robotTelemetryMissedHeartbeats.
- Create a mission from dashboard or API.
- Confirm mission transitions PENDING -> ASSIGNED -> IN_PROGRESS -> COMPLETED/FAILED.
- Confirm robot telemetry appears in dashboard charts.
- Stop robot telemetry or let a robot go stale and observe mission requeue/reassignment for active in-flight work.
- Expected mission duration (rough estimate):
- travel_time_sec ~= distance_m / move_speed_mps
- default move_speed_mps is 1.0
- with action overhead and feedback loop, a 3-10 m task usually finishes in about 5-20 seconds.
- Runtime behavior note:
- simulator action execution now uses elapsed time-stepping plus state-aware abort checks, which reduces false timeout failures when callback scheduling is delayed.
- Timing knobs:
- SIM_ACTION_TIMEOUT_SEC controls timeout ceiling (default 60 seconds)
- SIM_ACTION_STEP_SEC and SIM_SLEEP_SCALE affect progress cadence
- SIM_FAIL_PROBABILITY and SIM_FORCE_FAIL_MISSION_IDS inject failures intentionally
- If a robot turns red (ERROR) after long IN_PROGRESS:
- verify SIM_FAIL_PROBABILITY=0.0 for demos
- verify SIM_ACTION_TIMEOUT_SEC is high enough for your target distances
- check fleet_manager and robot_simulator logs for timeout/failure messages
- restart edge services after changing simulation env vars
-
Root architecture source file: systemarchitecture.mmd
-
Additional project docs:
- docs/contracts.md
- docs/architecture.md
- docs/api_examples.http
- docs/testing_strategy.md
-
If docker command is not recognized, install Docker Desktop and reopen terminal.
-
For ROS2 across different network segments, DDS discovery may require extra networking setup (VPN or discovery-server pattern).
