Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .ci/readability-baseline.env
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Generated by scripts/readability-ratchet.sh
PROD_RS_TOTAL=325
PROD_FILES_GT300=108
PROD_RS_TOTAL=326
PROD_FILES_GT300=110
PROD_FILES_GT500=51
PROD_FILES_GT1000=3
PROD_MAX_FILE_LINES=1823
PROD_MAX_FILE_PATH=crates/temper-server/src/observe/evolution/insight_generator.rs
ALLOW_CLIPPY_COUNT=23
ALLOW_DEAD_CODE_COUNT=9
PROD_PRINTLN_COUNT=176
PROD_UNWRAP_CI_OK_COUNT=115
PROD_PRINTLN_COUNT=179
PROD_UNWRAP_CI_OK_COUNT=118
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ ui/observe/components/Graph3D.tsx
scripts/discord-clean-view.js
scripts/generate-graph-json.js

.proof/
.proof/*
!.proof/
!.proof/*.md
.code-review-pass
.dst-review-pass
.vercel
Expand Down
929 changes: 929 additions & 0 deletions .proof/temper-agent-e2e-proof.md

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions crates/temper-mcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ pub struct McpConfig {
/// Full URL of a remote Temper server (e.g. `https://api.temper.build`).
/// Mutually exclusive with `temper_port`.
pub temper_url: Option<String>,
/// Agent instance ID. Resolved from the credential registry via
/// `TEMPER_API_KEY` at startup (ADR-0033). Only used as an override
/// when credential resolution is not available.
/// Optional local agent label. When `TEMPER_API_KEY` resolves through
/// the credential registry (ADR-0033), the verified platform-assigned
/// agent ID replaces this value. This field does not grant HTTP identity.
pub agent_id: Option<String>,
/// Agent software classification (e.g. `claude-code`). Resolved from
/// the credential registry's `AgentType` entity at startup (ADR-0033).
/// Optional local agent type label (e.g. `claude-code`). When
/// `TEMPER_API_KEY` resolves through the credential registry, the
/// verified platform-assigned type replaces this value. This field does
/// not grant HTTP identity.
pub agent_type: Option<String>,
/// Session ID (`X-Session-Id`). Auto-derived from `CLAUDE_SESSION_ID`.
pub session_id: Option<String>,
Expand Down
87 changes: 87 additions & 0 deletions crates/temper-mcp/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::env;

use temper_mcp::{McpConfig, run_stdio_server};

fn parse_args() -> Result<McpConfig, String> {
let mut temper_port = None;
let mut temper_url = None;
let mut agent_id = None;
let mut agent_type = None;
let mut session_id = None;
let mut api_key = env::var("TEMPER_API_KEY").ok();

let mut args = env::args().skip(1);
while let Some(arg) = args.next() {
match arg.as_str() {
"--port" => {
let value = args.next().ok_or("--port requires a value")?;
let parsed = value
.parse::<u16>()
.map_err(|_| format!("invalid --port value: {value}"))?;
temper_port = Some(parsed);
}
"--url" => {
temper_url = Some(args.next().ok_or("--url requires a value")?);
}
"--agent-id" => {
agent_id = Some(args.next().ok_or("--agent-id requires a value")?);
}
"--agent-type" => {
agent_type = Some(args.next().ok_or("--agent-type requires a value")?);
}
"--session-id" => {
session_id = Some(args.next().ok_or("--session-id requires a value")?);
}
"--api-key" => {
api_key = Some(args.next().ok_or("--api-key requires a value")?);
}
"-h" | "--help" => {
print_help();
std::process::exit(0);
}
other => {
return Err(format!("unknown argument: {other}"));
}
}
}

if temper_port.is_some() && temper_url.is_some() {
return Err("use either --port or --url, not both".to_string());
}
if temper_port.is_none() && temper_url.is_none() {
return Err("either --port or --url is required".to_string());
}

Ok(McpConfig {
temper_port,
temper_url,
agent_id,
agent_type,
session_id,
api_key,
})
}

fn print_help() {
eprintln!(
"temper-mcp\n\n\
Usage:\n temper-mcp --port <PORT> [--agent-id <ID>] [--agent-type <TYPE>] [--session-id <ID>] [--api-key <KEY>]\n temper-mcp --url <URL> [--agent-id <ID>] [--agent-type <TYPE>] [--session-id <ID>] [--api-key <KEY>]\n\n\
Options:\n --port <PORT> Connect to a local Temper server on 127.0.0.1:<PORT>\n --url <URL> Connect to a Temper server at the given base URL\n --agent-id <ID> Optional local label; does not grant platform identity\n --agent-type <TYPE> Optional local type label; does not grant platform identity\n --session-id <ID> Set X-Session-Id for outbound requests\n --api-key <KEY> Bearer token for API authentication (or use TEMPER_API_KEY)\n -h, --help Show this help text"
);
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = match parse_args() {
Ok(config) => config,
Err(error) => {
eprintln!("{error}");
eprintln!();
print_help();
std::process::exit(2);
}
};

run_stdio_server(config).await?;
Ok(())
}
2 changes: 1 addition & 1 deletion crates/temper-platform/src/os_apps/mod_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ fn test_get_skill_temper_agent() {
let bundle = get_skill("temper-agent");
assert!(bundle.is_some());
let bundle = bundle.unwrap();
assert_eq!(bundle.specs.len(), 1);
assert_eq!(bundle.specs.len(), 8); // TemperAgent + AgentSoul + AgentSkill + AgentMemory + ToolHook + HeartbeatMonitor + CronJob + CronScheduler
assert!(!bundle.csdl.is_empty());
assert!(!bundle.cedar_policies.is_empty());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/temper-sandbox/src/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::runner::run_sandbox;
pub struct ReplConfig {
/// Port of the running Temper HTTP server.
pub server_port: u16,
/// Agent ID for `X-Temper-Principal-Id` header.
/// Optional local label for the REPL session.
pub agent_id: Option<String>,
}

Expand Down
4 changes: 4 additions & 0 deletions crates/temper-server/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use crate::state::ServerState;
/// A notification emitted when an entity transitions to a new state.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EntityStateChange {
/// Monotonic per-entity event sequence.
#[serde(default)]
pub seq: u64,
/// The entity type (e.g., "Order").
pub entity_type: String,
/// The entity ID.
Expand Down Expand Up @@ -77,6 +80,7 @@ mod tests {
#[test]
fn entity_state_change_serializes() {
let change = EntityStateChange {
seq: 1,
entity_type: "Order".into(),
entity_id: "o-1".into(),
action: "SubmitOrder".into(),
Expand Down
53 changes: 50 additions & 3 deletions crates/temper-server/src/observe/entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ pub(crate) struct WaitForEntityStateParams {
pub poll_ms: Option<u64>,
}

#[derive(Debug, Deserialize)]
pub(crate) struct EntityEventStreamParams {
pub since: Option<u64>,
}

/// GET /observe/entities/{entity_type}/{entity_id}/wait -- wait for an entity to reach a target status.
#[instrument(skip_all, fields(otel.name = "GET /observe/entities/{entity_type}/{entity_id}/wait", entity_type, entity_id))]
pub(crate) async fn handle_wait_for_entity_state(
Expand All @@ -182,15 +187,15 @@ pub(crate) async fn handle_wait_for_entity_state(

let timeout_ms = params.timeout_ms.unwrap_or(120_000).clamp(1, 300_000);
let poll_ms = params.poll_ms.unwrap_or(250).clamp(10, 5_000);
let deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms);
let deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms); // determinism-ok: HTTP handler, not actor code

loop {
let entity = state
.get_tenant_entity_state(&tenant, &entity_type, &entity_id)
.await
.map_err(|_| StatusCode::NOT_FOUND)?;
let status = entity.state.status.clone();
let timed_out = tokio::time::Instant::now() >= deadline;
let timed_out = tokio::time::Instant::now() >= deadline; // determinism-ok: HTTP handler, not actor code

if target_statuses.contains(&status) || timed_out {
let mut json = serde_json::to_value(&entity.state)
Expand All @@ -201,10 +206,52 @@ pub(crate) async fn handle_wait_for_entity_state(
return Ok(Json(json));
}

tokio::time::sleep(Duration::from_millis(poll_ms)).await;
tokio::time::sleep(Duration::from_millis(poll_ms)).await; // determinism-ok: HTTP handler, not actor code
}
}

/// GET /observe/entities/{entity_type}/{entity_id}/events -- replayable SSE stream for one entity.
pub(crate) async fn handle_entity_event_stream(
State(state): State<ServerState>,
headers: HeaderMap,
Path((entity_type, entity_id)): Path<(String, String)>,
Query(params): Query<EntityEventStreamParams>,
) -> Result<Sse<impl tokio_stream::Stream<Item = Result<Event, Infallible>>>, StatusCode> {
require_observe_auth(&state, &headers, "read_events", "Entity")?;
let tenant = extract_tenant(&headers, &state).map_err(|(code, _)| code)?;
let since = params.since.unwrap_or(0);
let rx = state.entity_observe_tx.subscribe();
let replay_events = state
.replay_entity_observe_events(tenant.as_str(), &entity_type, &entity_id, since)
.into_iter()
.collect::<Vec<_>>();
let replay_high_water = replay_events.last().map(|event| event.seq).unwrap_or(since);
let replay = replay_events.into_iter().map(|event| {
let data = serde_json::to_string(&event.data).unwrap_or_default();
Ok::<Event, Infallible>(Event::default().event(&event.event_name).data(data))
});
let replay_stream = tokio_stream::iter(replay);

let live_tenant = tenant.clone();
let live_entity_type = entity_type.clone();
let live_entity_id = entity_id.clone();
let live_stream = BroadcastStream::new(rx).filter_map(move |result| match result {
Ok(event)
if event.tenant == live_tenant.as_str()
&& event.entity_type == live_entity_type
&& event.entity_id == live_entity_id
&& event.seq > replay_high_water =>
{
let data = serde_json::to_string(&event.data).unwrap_or_default();
Some(Ok(Event::default().event(&event.event_name).data(data)))
}
Ok(_) => None,
Err(_) => None,
});

Ok(Sse::new(replay_stream.chain(live_stream)).keep_alive(KeepAlive::default()))
}

/// Format entity events into the history API response shape.
fn format_history_response(
entity_type: &str,
Expand Down
4 changes: 4 additions & 0 deletions crates/temper-server/src/observe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ pub fn build_observe_router() -> Router<ServerState> {
"/entities/{entity_type}/{entity_id}/wait",
get(entities::handle_wait_for_entity_state),
)
.route(
"/entities/{entity_type}/{entity_id}/events",
get(entities::handle_entity_event_stream),
)
.route("/events/stream", get(entities::handle_event_stream))
.route(
"/verification-status",
Expand Down
3 changes: 3 additions & 0 deletions crates/temper-server/src/router_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ async fn test_sse_events_endpoint_delivers_state_changes() {

// Send a state change event on the broadcast channel.
let _ = event_tx.send(EntityStateChange {
seq: 1,
entity_type: "Order".into(),
entity_id: "o-sse-1".into(),
action: "SubmitOrder".into(),
Expand Down Expand Up @@ -620,6 +621,7 @@ async fn test_sse_events_lagged_receiver_continues() {
// Flood it before any subscriber — then subscribe and send one more event.
for i in 0..300 {
let _ = event_tx.send(EntityStateChange {
seq: (i + 1) as u64,
entity_type: "Order".into(),
entity_id: format!("flood-{i}"),
action: "Flood".into(),
Expand All @@ -645,6 +647,7 @@ async fn test_sse_events_lagged_receiver_continues() {

// Send a fresh event that should be delivered.
let _ = event_tx.send(EntityStateChange {
seq: 301,
entity_type: "Order".into(),
entity_id: "after-flood".into(),
action: "Fresh".into(),
Expand Down
55 changes: 53 additions & 2 deletions crates/temper-server/src/state/dispatch/effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,66 @@ impl crate::state::ServerState {
ctx: &PostDispatchContext<'_>,
response: &EntityResponse,
) {
let _ = self.event_tx.send(EntityStateChange {
let seq =
self.next_entity_event_sequence(ctx.tenant.as_str(), ctx.entity_type, ctx.entity_id);
let change = EntityStateChange {
seq,
entity_type: ctx.entity_type.to_string(),
entity_id: ctx.entity_id.to_string(),
action: ctx.action.to_string(),
status: response.state.status.clone(),
tenant: ctx.tenant.to_string(),
agent_id: ctx.agent_ctx.agent_id.clone(),
session_id: ctx.agent_ctx.session_id.clone(),
});
};
self.record_entity_observe_event_with_seq(
ctx.tenant.as_str(),
ctx.entity_type,
ctx.entity_id,
seq,
"state_change",
serde_json::to_value(&change).unwrap_or_default(),
);
let _ = self.event_tx.send(change);
if matches!(
response.state.status.as_str(),
"Completed" | "Failed" | "Cancelled"
) {
let terminal_seq = self.next_entity_event_sequence(
ctx.tenant.as_str(),
ctx.entity_type,
ctx.entity_id,
);
let result = response
.state
.fields
.get("result")
.or_else(|| response.state.fields.get("Result"))
.and_then(serde_json::Value::as_str);
let error_message = response
.state
.fields
.get("error_message")
.or_else(|| response.state.fields.get("ErrorMessage"))
.and_then(serde_json::Value::as_str)
.or(response.error.as_deref());
self.record_entity_observe_event_with_seq(
ctx.tenant.as_str(),
ctx.entity_type,
ctx.entity_id,
terminal_seq,
"agent_complete",
serde_json::json!({
"seq": terminal_seq,
"status": response.state.status,
"action": ctx.action,
"result": result,
"error_message": error_message,
"agent_id": ctx.agent_ctx.agent_id,
"session_id": ctx.agent_ctx.session_id,
}),
);
}
let cache_key = format!("{}:{}:{}", ctx.tenant, ctx.entity_type, ctx.entity_id);
self.cache_entity_status(cache_key, response.state.status.clone());
let _ = self
Expand Down
Loading
Loading