diff --git a/charts/openab-feishu/README.md b/charts/openab-feishu/README.md index 69711e5ef..7564de178 100644 --- a/charts/openab-feishu/README.md +++ b/charts/openab-feishu/README.md @@ -72,6 +72,7 @@ Under **Permissions & Scopes**, add these scopes: | `im:message.p2p_msg:readonly` | Read DM messages | | `im:resource` | Download images/files from messages | | `contact:user.base:readonly` | Resolve user display names | +| `cardkit:card:write` | Create/update interactive streaming cards (required for card streaming, on by default) | ### 5. Publish @@ -189,6 +190,10 @@ helm install my-bot ./charts/openab-feishu \ | `platform.requireMention` | `true` | Require @mention in groups | | `platform.allowedGroups` | `[]` | Allowed group chat IDs | | `platform.allowedUsers` | `[]` | Allowed user open_ids | +| `cardStreaming.mode` | `"auto"` | Card streaming: `auto` / `card` / `post` (kill-switch) | +| `cardStreaming.fallbackToPost` | `true` | Fall back to post if a card API call fails | +| `cardStreaming.promoteBytes` | `4000` | Byte threshold for auto-promoting to a card | +| `cardStreaming.idleFinalizeMs` | `3000` | Idle window (ms) before finalizing a card | | `persistence.enabled` | `true` | Enable PVC for agent state | | `persistence.size` | `"1Gi"` | PVC size | diff --git a/charts/openab-feishu/templates/deployment.yaml b/charts/openab-feishu/templates/deployment.yaml index dbadc7962..6d9ceb424 100644 --- a/charts/openab-feishu/templates/deployment.yaml +++ b/charts/openab-feishu/templates/deployment.yaml @@ -123,6 +123,15 @@ spec: - name: FEISHU_ALLOWED_USERS value: {{ join "," .Values.platform.allowedUsers | quote }} {{- end }} + # Card streaming (interactive cards for streaming replies) + - name: FEISHU_CARD_STREAMING_MODE + value: {{ .Values.cardStreaming.mode | quote }} + - name: FEISHU_CARD_FALLBACK_TO_POST + value: {{ .Values.cardStreaming.fallbackToPost | quote }} + - name: FEISHU_CARD_PROMOTE_BYTES + value: {{ .Values.cardStreaming.promoteBytes | quote }} + - name: FEISHU_CARD_IDLE_FINALIZE_MS + value: {{ .Values.cardStreaming.idleFinalizeMs | quote }} volumeMounts: {{- if .Values.persistence.enabled }} - name: data diff --git a/charts/openab-feishu/values.yaml b/charts/openab-feishu/values.yaml index dd9f11c04..eaf7e8eef 100644 --- a/charts/openab-feishu/values.yaml +++ b/charts/openab-feishu/values.yaml @@ -93,6 +93,21 @@ platform: # -- Feishu user open_ids allowed (empty = all users) allowedUsers: [] +# -- Card streaming: render streaming replies as interactive CardKit cards +# (no 20-edit cap, native markdown / table rendering). +cardStreaming: + # -- "auto" (default: short replies stay as post, long / code / table replies + # promote to a card), "card" (always card), or "post" (kill-switch: disable + # card streaming, back to post-only behavior). + mode: "auto" + # -- Fall back to a post message if a card API call fails (recommended). + fallbackToPost: true + # -- Byte threshold at which a plain-text reply auto-promotes to a card. + promoteBytes: 4000 + # -- Idle window (ms) after which a streaming card with no further edits is + # finalized (typewriter cursor settles, markdown re-renders). + idleFinalizeMs: 3000 + # -- Persistence for agent working directory persistence: enabled: true diff --git a/docs/feishu.md b/docs/feishu.md index 515123806..dabf2519f 100644 --- a/docs/feishu.md +++ b/docs/feishu.md @@ -84,6 +84,10 @@ https://your-gateway-host/webhook/feishu | — | `FEISHU_ALLOW_USER_MESSAGES` | `multibot-mentions` | Thread response mode: `multibot-mentions` / `involved` / `mentions`. See below. | | `gateway.botUsername` | — | — | Set to bot's `open_id` for @mention gating | | `gateway.streaming` | — | `false` | Enable streaming (typewriter) mode | +| `cardStreaming.mode` | `FEISHU_CARD_STREAMING_MODE` | `auto` | Card streaming mode: `auto` (short→post, long/code/table→card), `card` (always card), `post` (disable — kill-switch) | +| `cardStreaming.fallbackToPost` | `FEISHU_CARD_FALLBACK_TO_POST` | `true` | Fall back to a post message if a card API call fails | +| `cardStreaming.promoteBytes` | `FEISHU_CARD_PROMOTE_BYTES` | `4000` | Byte threshold for auto-promoting a plain-text reply to a card | +| `cardStreaming.idleFinalizeMs` | `FEISHU_CARD_IDLE_FINALIZE_MS` | `3000` | Idle window (ms) before a streaming card is finalized | ## @mention Gating @@ -189,6 +193,36 @@ streaming = true The gateway platform must support message editing (Feishu/Lark do). Platforms that don't support editing should leave `streaming = false` (default). +## Card Streaming + +By default (`FEISHU_CARD_STREAMING_MODE=auto`), streaming replies render as +**interactive CardKit cards** when the content warrants it. Cards have no +20-edit cap (errcode 230072) and render markdown — including **tables** and code +blocks — natively, which a `post` message cannot. + +| Mode | Behavior | +|---|---| +| `auto` (default) | Short replies stay as a `post` (native reply UI); long replies, or any reply containing a code fence or a markdown table, promote to a card. | +| `card` | Every reply is sent as a card from the first message. | +| `post` | Card streaming disabled — post-only behavior (the kill-switch). | + +Notes: + +- **Auto promotion is one-way**: a reply starts as a post and, once promoted, + stays a card. Promotion deletes the post placeholder (shown as "message + recalled" in Feishu) and re-sends as a card. In `card` mode the first reply is + a card from the start, so there is no placeholder and no recall. +- **Finalize**: after ~`FEISHU_CARD_IDLE_FINALIZE_MS` ms with no further edits, + the card is rebuilt as a static card so the typewriter cursor settles and the + markdown re-renders cleanly. +- **Fallback**: if a card API call fails and `FEISHU_CARD_FALLBACK_TO_POST` is + `true` (default), the gateway falls back to the post path (with the edit-cap + recovery), so a reply is never lost. +- **Tables wrapped in a code fence**: agents sometimes wrap a markdown table in a + bare ``` fence for monospace alignment in environments that don't render + tables. On the card path the gateway unwraps a fence whose body is exactly one + table so it renders as a native table. + ## Thread (Topic) Replies When a user replies to a bot message in a group chat, Feishu creates a thread (topic). The bot replies within the same thread, and each thread gets its own independent session. diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index e07dffceb..e40160f8a 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -27,6 +27,7 @@ subtle = "2" sha1 = "0.10" quick-xml = "0.37" image = { version = "0.25", default-features = false, features = ["jpeg", "png", "gif", "webp"] } +parking_lot = "0.12" urlencoding = "2" [dev-dependencies] diff --git a/gateway/src/adapters/feishu.rs b/gateway/src/adapters/feishu.rs index c2fa70218..3cbb50b80 100644 --- a/gateway/src/adapters/feishu.rs +++ b/gateway/src/adapters/feishu.rs @@ -9,6 +9,8 @@ use std::time::Instant; use tokio::sync::RwLock; use tracing::{info, warn}; +use super::feishu_card; + /// Timing-safe string comparison to prevent side-channel attacks on tokens. fn constant_time_eq(a: &str, b: &str) -> bool { use subtle::ConstantTimeEq; @@ -83,6 +85,45 @@ pub enum AllowUsers { MultibotMentions, } +/// Streaming delivery strategy for the Feishu adapter. +/// +/// Controls how the gateway renders streaming (`edit_message`) replies: +/// - `Post` — current production behavior: PATCH a `post` message in place, +/// with the PR #1122 230072 edit-cap recovery as a backstop. +/// - `Card` — always drive a CardKit v2 interactive streaming card +/// (no 20-edit cap; native markdown / table rendering). +/// - `Auto` — start as `post`, single-direction promote to `card` when the +/// content warrants it (long text, code fences, or tables). +/// +/// Defaults to `Auto` (S6 switched the feature on): short replies stay as a +/// `post` (native reply UI), while long / code-fence / table replies promote +/// to a CardKit streaming card. `FEISHU_CARD_STREAMING_MODE=post` is the +/// no-recompile kill-switch back to today's post-only behavior. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum StreamingMode { + Post, + Card, + #[default] + Auto, +} + +impl StreamingMode { + /// Parse from the `FEISHU_CARD_STREAMING_MODE` env value. + /// + /// - `""` or `"auto"` → `Auto` (matches `Default::default()`, so an empty + /// env var and an unset env var have identical semantics). + /// - `"card"` → `Card`. + /// - Any other value (including unknown strings) → `Post` (safe fallback, + /// preserves today's behavior for operators who set an unrecognised value). + fn parse(s: &str) -> Self { + match s.trim().to_lowercase().as_str() { + "" | "auto" => StreamingMode::Auto, + "card" => StreamingMode::Card, + _ => StreamingMode::Post, + } + } +} + #[derive(Debug, Clone)] pub struct FeishuConfig { pub app_id: String, @@ -107,6 +148,22 @@ pub struct FeishuConfig { /// tracking entirely — all messages will require @mention. /// Converted from `FEISHU_SESSION_TTL_HOURS` (user-facing, in hours) to seconds internally. pub session_ttl_secs: u64, + /// Streaming delivery strategy. Defaults to `Post` (today's behavior). + /// `FEISHU_CARD_STREAMING_MODE` = post | card | auto. + pub streaming_mode: StreamingMode, + /// When a CardKit streaming attempt fails (HTTP 5xx, rate limit, element + /// cap, or any unexpected errcode), fall back to the `post` path (which + /// keeps the PR #1122 cap-recovery). `FEISHU_CARD_FALLBACK_TO_POST`, + /// defaults to `true` (double safety net). + pub card_fallback_to_post: bool, + /// In `auto` mode, the byte threshold at which a plain-text reply is large + /// enough to promote from `post` to `card`. `FEISHU_CARD_PROMOTE_BYTES`, + /// defaults to 4000 (mirrors `message_limit`, i.e. a single post message). + pub card_promote_bytes: usize, + /// Idle-timer window (milliseconds) after which a streaming card with no + /// further edits is finalized by the gateway reaper. + /// `FEISHU_CARD_IDLE_FINALIZE_MS`, defaults to 3000. + pub card_idle_finalize_ms: u64, /// Override the API base URL. Used in tests to point at a mock server. /// Always None in production (not read from env). pub api_base_override: Option, @@ -178,6 +235,27 @@ impl FeishuConfig { .unwrap_or(24) * 3600; + // --- Card streaming. S6 flipped the default to Auto: when the env var + // is unset or empty, card streaming is on in auto mode (short → post, + // long / code / table → card). FEISHU_CARD_STREAMING_MODE=post is the + // no-recompile kill-switch back to today's post-only behavior. --- + let streaming_mode = std::env::var("FEISHU_CARD_STREAMING_MODE") + .ok() + .filter(|v| !v.trim().is_empty()) + .map(|v| StreamingMode::parse(&v)) + .unwrap_or_default(); + let card_fallback_to_post = std::env::var("FEISHU_CARD_FALLBACK_TO_POST") + .map(|v| v != "false" && v != "0") + .unwrap_or(true); + let card_promote_bytes = std::env::var("FEISHU_CARD_PROMOTE_BYTES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(4000); + let card_idle_finalize_ms = std::env::var("FEISHU_CARD_IDLE_FINALIZE_MS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(3000); + Some(Self { app_id, app_secret, @@ -196,6 +274,10 @@ impl FeishuConfig { dedupe_ttl_secs, message_limit, session_ttl_secs, + streaming_mode, + card_fallback_to_post, + card_promote_bytes, + card_idle_finalize_ms, api_base_override: None, }) } @@ -559,7 +641,7 @@ pub use event_types::*; // --------------------------------------------------------------------------- pub struct DedupeCache { - seen: std::sync::Mutex>, + seen: parking_lot::Mutex>, ttl_secs: u64, max_size: usize, } @@ -567,7 +649,7 @@ pub struct DedupeCache { impl DedupeCache { pub fn new(ttl_secs: u64) -> Self { Self { - seen: std::sync::Mutex::new(HashMap::new()), + seen: parking_lot::Mutex::new(HashMap::new()), ttl_secs, max_size: 10_000, } @@ -575,7 +657,7 @@ impl DedupeCache { /// Returns true if this id was already seen (duplicate). pub fn is_duplicate(&self, id: &str) -> bool { - let mut map = self.seen.lock().unwrap_or_else(|e| e.into_inner()); + let mut map = self.seen.lock(); // Lazy sweep if map.len() >= self.max_size { map.retain(|_, ts| ts.elapsed().as_secs() < self.ttl_secs); @@ -700,17 +782,17 @@ pub struct FeishuAdapter { pub bot_open_id: Arc>>, pub dedupe: Arc, pub rate_limiter: Arc, - pub name_cache: Arc>>, + pub name_cache: Arc>>, /// Per-channel bot turn counter. Key = chat_id, Value = (count, last_reset). /// Human message resets count to 0. Prevents runaway bot-to-bot loops. - pub bot_turns: Arc>>, // eviction: human msg resets; follow-up can add TTL like participated_threads + pub bot_turns: Arc>>, // eviction: human msg resets; follow-up can add TTL like participated_threads /// Positive-only cache: thread_id (root_id) → last_replied_at. /// When bot has replied in a thread, subsequent messages in that thread /// bypass @mention gating (like Discord's "involved" mode). - pub participated_threads: Arc>>, + pub participated_threads: Arc>>, /// Positive-only cache: thread_id → first_seen for threads where other bots /// have posted. Used by multibot-mentions mode to require @mention. - pub multibot_threads: Arc>>, + pub multibot_threads: Arc>>, /// Per-message edit count tracker for Feishu's 20-edits-per-message hard cap /// (errcode 230072 — "The message has reached the number of times it can be edited"). /// Insertion-order FIFO eviction: when over `EDIT_COUNTS_CACHE_MAX`, the @@ -720,7 +802,12 @@ pub struct FeishuAdapter { /// 4096 newer messages have been inserted behind it; that resets its count /// to 1, which is acceptable — it only loses the local preemptive margin and /// the on-wire 230072 sentinel still backstops.) - pub edit_counts: Arc>, + pub edit_counts: Arc>, + /// Active card-streaming sessions (S5), keyed by the placeholder post + /// message_id (`om_post`) that core believes it is editing. A session + /// exists only after a reply is promoted from post to card; empty unless + /// `streaming_mode` is `card` / `auto`. FIFO eviction lives in the registry. + pub stream_sessions: Arc>, pub client: reqwest::Client, } @@ -749,11 +836,14 @@ impl FeishuAdapter { dedupe, rate_limiter, bot_open_id: Arc::new(RwLock::new(None)), - name_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), - bot_turns: Arc::new(std::sync::Mutex::new(HashMap::new())), - participated_threads: Arc::new(std::sync::Mutex::new(HashMap::new())), - multibot_threads: Arc::new(std::sync::Mutex::new(HashMap::new())), - edit_counts: Arc::new(std::sync::Mutex::new(EditCountsCache::default())), + name_cache: Arc::new(parking_lot::Mutex::new(HashMap::new())), + bot_turns: Arc::new(parking_lot::Mutex::new(HashMap::new())), + participated_threads: Arc::new(parking_lot::Mutex::new(HashMap::new())), + multibot_threads: Arc::new(parking_lot::Mutex::new(HashMap::new())), + edit_counts: Arc::new(parking_lot::Mutex::new(EditCountsCache::default())), + stream_sessions: Arc::new(parking_lot::Mutex::new( + feishu_card::FeishuStreamRegistry::default(), + )), client: reqwest::Client::new(), } } @@ -904,10 +994,10 @@ async fn ws_connect_loop( client: &reqwest::Client, event_tx: &broadcast::Sender, shutdown_rx: &mut watch::Receiver, - name_cache: &Arc>>, - bot_turns: &Arc>>, - participated_threads: &Arc>>, - multibot_threads: &Arc>>, + name_cache: &Arc>>, + bot_turns: &Arc>>, + participated_threads: &Arc>>, + multibot_threads: &Arc>>, ) -> anyhow::Result<()> { let api_base = config.api_base(); @@ -993,12 +1083,12 @@ async fn handle_ws_message( dedupe: &Arc, config: &FeishuConfig, event_tx: &broadcast::Sender, - name_cache: &Arc>>, + name_cache: &Arc>>, token_cache: &Arc, client: &reqwest::Client, - bot_turns: &Arc>>, - participated_threads: &Arc>>, - multibot_threads: &Arc>>, + bot_turns: &Arc>>, + participated_threads: &Arc>>, + multibot_threads: &Arc>>, ) { let envelope: FeishuEventEnvelope = match serde_json::from_str(text) { Ok(e) => e, @@ -1052,7 +1142,7 @@ async fn handle_ws_message( // Bot turn tracking: prevent runaway bot-to-bot loops let channel_id = &gateway_event.channel.id; { - let mut turns = bot_turns.lock().unwrap_or_else(|e| e.into_inner()); + let mut turns = bot_turns.lock(); if gateway_event.sender.is_bot { let count = turns.entry(channel_id.to_string()).or_insert(0); *count += 1; @@ -1120,13 +1210,13 @@ async fn handle_ws_message( /// Resolve user display name from open_id via Contact API, with caching. async fn resolve_user_name( open_id: &str, - name_cache: &Arc>>, + name_cache: &Arc>>, token_cache: &Arc, client: &reqwest::Client, api_base: &str, ) -> String { { - let cache = name_cache.lock().unwrap_or_else(|e| e.into_inner()); + let cache = name_cache.lock(); if let Some(name) = cache.get(open_id) { return name.clone(); } @@ -1151,7 +1241,7 @@ async fn resolve_user_name( // Only cache successful resolutions — don't cache fallback open_id // so retries can succeed after permissions are granted. if let Some(ref name) = resolved { - let mut cache = name_cache.lock().unwrap_or_else(|e| e.into_inner()); + let mut cache = name_cache.lock(); if cache.len() < 10_000 { cache.insert(open_id.to_string(), name.clone()); } @@ -1227,10 +1317,10 @@ pub enum EditOutcome { /// lowest-count entries) so active streams are not bumped out from under /// themselves. fn increment_edit_count( - cache: &Arc>, + cache: &Arc>, message_id: &str, ) { - let mut c = cache.lock().unwrap_or_else(|e| e.into_inner()); + let mut c = cache.lock(); let was_new = !c.counts.contains_key(message_id); let entry = c.counts.entry(message_id.to_string()).or_insert(0); if *entry != u32::MAX { @@ -1246,10 +1336,10 @@ fn increment_edit_count( /// call and signal `EditOutcome::CapReached` directly so the core finalize /// path can take over. fn mark_edit_cap( - cache: &Arc>, + cache: &Arc>, message_id: &str, ) { - let mut c = cache.lock().unwrap_or_else(|e| e.into_inner()); + let mut c = cache.lock(); let was_new = !c.counts.contains_key(message_id); c.counts.insert(message_id.to_string(), u32::MAX); if was_new { @@ -1277,10 +1367,10 @@ fn evict_if_overcap(c: &mut EditCountsCache) { /// Return true if this message_id has already reached the edit cap (either /// tracked locally or marked via 230072 sentinel). fn is_edit_cap_reached( - cache: &Arc>, + cache: &Arc>, message_id: &str, ) -> bool { - let c = cache.lock().unwrap_or_else(|e| e.into_inner()); + let c = cache.lock(); c.counts .get(message_id) .is_some_and(|&n| n >= FEISHU_EDIT_CAP) @@ -1438,6 +1528,61 @@ async fn delete_feishu_message( } } +// --------------------------------------------------------------------------- +// Streaming path decision (S2): post vs card +// --------------------------------------------------------------------------- + +/// Decide whether a streaming snapshot should be rendered as a CardKit card +/// rather than a `post` message. +/// +/// Pure function over the *full* (non-delta) accumulated text, the configured +/// mode, and the promote threshold. This is the content-based half of the +/// `auto` promotion rule; the other trigger — hitting the PATCH 20-edit cap — +/// lives in the S5 wiring, not here. +/// +/// - `Post` never promotes (today's behavior). +/// - `Card` always uses a card. +/// - `Auto` promotes when the text is large, or contains a code fence or a +/// markdown table — exactly the cases where `markdown_to_post` degrades +/// (tables are dropped entirely; see Issue #1124). +/// +/// Wired into `handle_card_edit` (S5). +fn should_use_card(text: &str, mode: StreamingMode, promote_bytes: usize) -> bool { + match mode { + StreamingMode::Post => false, + StreamingMode::Card => true, + StreamingMode::Auto => { + text.len() >= promote_bytes || has_code_fence(text) || has_markdown_table(text) + } + } +} + +/// True if the text contains a fenced code block opener (```), detected the +/// same way `markdown_to_post` detects fences: a line whose first +/// non-whitespace characters are three backticks. Inline code (single +/// backticks) is deliberately not matched. +fn has_code_fence(text: &str) -> bool { + text.split('\n') + .any(|line| line.trim_start().starts_with("```")) +} + +/// True if the text contains a GFM table delimiter row, e.g. `|---|---|` or +/// `| :--- | ---: |`. The delimiter row is the defining feature of a markdown +/// table and is precisely what `markdown_to_post` cannot render (Issue #1124). +/// +/// Heuristic: a line composed solely of pipes, dashes, colons and spaces, that +/// contains at least one pipe and at least one dash. This excludes thematic +/// breaks (`---`, no pipe), bullet lists (no pipe), and prose lines that merely +/// happen to contain a pipe (non-delimiter characters present). +fn has_markdown_table(text: &str) -> bool { + text.split('\n').any(|line| { + let t = line.trim(); + t.contains('|') + && t.contains('-') + && t.chars().all(|c| matches!(c, '|' | '-' | ':' | ' ')) + }) +} + // --------------------------------------------------------------------------- // Markdown → Feishu post conversion // --------------------------------------------------------------------------- @@ -2232,7 +2377,7 @@ async fn remove_reaction(adapter: &FeishuAdapter, message_id: &str, emoji: &str) /// (non-expired) participation entry in the cache. fn check_thread_participated( envelope: &FeishuEventEnvelope, - cache: &Arc>>, + cache: &Arc>>, session_ttl_secs: u64, ) -> bool { envelope @@ -2243,7 +2388,7 @@ fn check_thread_participated( .map(|tid| { // Intentionally recover from poisoned mutex — cache data loss is acceptable // and preferable to panicking the gateway. - let c = cache.lock().unwrap_or_else(|e| e.into_inner()); + let c = cache.lock(); c.get(tid).is_some_and(|ts| ts.elapsed().as_secs() < session_ttl_secs) }) .unwrap_or(false) @@ -2264,8 +2409,8 @@ fn detect_and_mark_multibot( envelope: &FeishuEventEnvelope, bot_open_id: Option<&str>, config: &FeishuConfig, - participated_threads: &Arc>>, - multibot_threads: &Arc>>, + participated_threads: &Arc>>, + multibot_threads: &Arc>>, ) -> bool { let self_participated = check_thread_participated( envelope, participated_threads, config.session_ttl_secs, @@ -2306,7 +2451,7 @@ fn detect_and_mark_multibot( if mentions_other_bot { info!(thread_id = %tid, "multibot thread detected via @mention"); - let mut cache = multibot_threads.lock().unwrap_or_else(|e| e.into_inner()); + let mut cache = multibot_threads.lock(); cache.entry(tid.to_string()).or_insert_with(Instant::now); if cache.len() > PARTICIPATION_CACHE_MAX { cache.retain(|_, ts| ts.elapsed().as_secs() < config.session_ttl_secs); @@ -2326,7 +2471,7 @@ fn detect_and_mark_multibot( } else { thread_id_for_check .map(|tid| { - let cache = multibot_threads.lock().unwrap_or_else(|e| e.into_inner()); + let cache = multibot_threads.lock(); cache .get(tid) .is_none_or(|ts| ts.elapsed().as_secs() >= config.session_ttl_secs) @@ -2340,7 +2485,7 @@ fn detect_and_mark_multibot( /// Record that the bot has participated in a thread. Evicts oldest entries /// when the cache exceeds PARTICIPATION_CACHE_MAX. fn record_participation( - cache: &Arc>>, + cache: &Arc>>, thread_id: &str, session_ttl_secs: u64, ) { @@ -2349,7 +2494,7 @@ fn record_participation( } // Intentionally recover from poisoned mutex — cache data loss is acceptable // and preferable to panicking the gateway. - let mut map = cache.lock().unwrap_or_else(|e| e.into_inner()); + let mut map = cache.lock(); map.insert(thread_id.to_string(), Instant::now()); // Evict if over capacity: first drop expired entries, then oldest half if still over if map.len() > PARTICIPATION_CACHE_MAX { @@ -2424,58 +2569,10 @@ pub async fn handle_reply( return; } "edit_message" => { - let outcome = edit_feishu_message( - adapter, - &reply.reply_to, - &reply.content.text, - ).await; - // Translate outcome → (success, message_id, error). For - // CapReached we deliberately do NOT append-new at the gateway - // layer (see the rationale on the CapReached arm below); we - // signal failure so core's finalize path owns recovery. - let (success, message_id, error) = match outcome { - EditOutcome::Edited => { - (true, Some(reply.reply_to.clone()), None) - } - EditOutcome::CapReached => { - // Do NOT append-new fallback at the gateway layer. Core's - // cosmetic streaming loop flushes every ~1500ms — if every - // post-cap edit spawned a new message, the user would be - // spammed with 20+ duplicate continuation messages over the - // remainder of a long reply. - // - // Instead, signal failure so: - // 1. core's mid-stream cosmetic edit loop hits its - // consecutive-failures break (3 strikes) and stops - // attempting edits, freezing the placeholder mid-content - // 2. the final delivery path in src/adapter.rs sees the - // placeholder edit fail and falls back to send_message - // so the user gets the full reply as a fresh message - // - // Net UX: half-edited placeholder + one complete continuation - // message + ✅ done reaction (vs. today's mid-truncation + 🆗 - // false success, or naive append-fallback's 25-message spam). - ( - false, - None, - Some("edit_cap_reached".to_string()), - ) - } - EditOutcome::Failed(err) => (false, None, Some(err)), - }; - if let Some(ref req_id) = reply.request_id { - let resp = crate::schema::GatewayResponse { - schema: "openab.gateway.response.v1".into(), - request_id: req_id.clone(), - success, - thread_id: None, - message_id, - error, - }; - if let Ok(json) = serde_json::to_string(&resp) { - let _ = event_tx.send(json); - } - } + // Card-streaming aware: promote post→card, stream to an existing + // card, or PATCH the post (today's behavior + #1122 cap recovery). + // Always reports om_post back to core so the swap stays invisible. + handle_card_edit(reply, adapter, event_tx).await; return; } "create_topic" | "set_reaction" => { @@ -2483,29 +2580,27 @@ pub async fn handle_reply( return; } "delete_message" => { - let result = delete_feishu_message(adapter, &reply.reply_to).await; + // If this reply was promoted to a card, the original post + // placeholder was already deleted at promotion — delete the card + // message instead. Otherwise delete the id directly (today's + // behavior). The card_message_id comes from our own session state + // (gateway-internal, trusted) so it bypasses the shape check. + let card_msg = { + let mut reg = adapter + .stream_sessions + .lock(); + reg.remove(&reply.reply_to).map(|s| s.card_message_id) + }; + let target = card_msg.as_deref().unwrap_or(&reply.reply_to); + let result = delete_feishu_message(adapter, target).await; let (success, error) = match result { Ok(()) => (true, None), Err(e) => (false, Some(e)), }; // Dormant by design: core's delete_message is fire-and-forget - // (request_id = None), so this response branch is currently - // never taken. Kept for symmetry with the other handlers and so - // delete becomes observable for free if a future caller (or - // another gateway client) sets request_id. - if let Some(ref req_id) = reply.request_id { - let resp = crate::schema::GatewayResponse { - schema: "openab.gateway.response.v1".into(), - request_id: req_id.clone(), - success, - thread_id: None, - message_id: None, - error, - }; - if let Ok(json) = serde_json::to_string(&resp) { - let _ = event_tx.send(json); - } - } + // (request_id = None), so this branch is currently never taken. + // Kept for symmetry and free observability if a caller sets one. + emit_response(event_tx, &reply.request_id, success, None, error); return; } _ => {} @@ -2536,6 +2631,16 @@ pub async fn handle_reply( let api_base = adapter.config.api_base(); let text = &reply.content.text; let limit = adapter.config.message_limit; + + // Card mode: send the first reply straight as a card (no post placeholder, + // no "message revoked" notice). Falls through to the post path if the card + // create/send fails. + if adapter.config.streaming_mode == StreamingMode::Card + && try_send_initial_card(reply, adapter, event_tx, &token, &api_base).await + { + return; + } + // quote_message_id (agent-controlled reply-to) takes priority over thread_id let reply_target = reply.quote_message_id.as_deref() .or(reply.channel.thread_id.as_deref()); @@ -2652,6 +2757,402 @@ pub async fn handle_reply( } } +// --------------------------------------------------------------------------- +// Card streaming wiring (S5) +// --------------------------------------------------------------------------- + +/// Emit a GatewayResponse back to core, but only if the reply carried a +/// request_id (fire-and-forget replies don't expect one). +fn emit_response( + event_tx: &tokio::sync::broadcast::Sender, + request_id: &Option, + success: bool, + message_id: Option, + error: Option, +) { + if let Some(req_id) = request_id { + let resp = crate::schema::GatewayResponse { + schema: "openab.gateway.response.v1".into(), + request_id: req_id.clone(), + success, + thread_id: None, + message_id, + error, + }; + if let Ok(json) = serde_json::to_string(&resp) { + let _ = event_tx.send(json); + } + } +} + +/// Handle an `edit_message` command with card-streaming awareness. +/// +/// Operates on the FULL accumulated `text` (never a delta — #565). Decision tree: +/// - Session exists (already promoted) → stream the full text to the card. +/// - No session + `should_use_card` → promote post→card. +/// - No session + post path → PATCH the post (today's behavior + #1122 cap +/// recovery); if the 20-edit cap is hit AND mode != post, treat the cap as +/// "long enough" and promote. +/// +/// `message_id` reported back to core is always the original `om_post` +/// placeholder, so the post→card swap stays invisible to core (no schema change). +async fn handle_card_edit( + reply: &GatewayReply, + adapter: &FeishuAdapter, + event_tx: &tokio::sync::broadcast::Sender, +) { + let om_post = reply.reply_to.clone(); + let text = &reply.content.text; + let mode = adapter.config.streaming_mode; + + // Snapshot session state under the lock; the std Mutex guard is dropped + // before any .await (a guard held across await would not be Send). + enum Existing { + None, + Finalized, + Active { card_id: String, seq: i64 }, + } + let existing = { + let mut reg = adapter + .stream_sessions + .lock(); + match reg.get_mut(&om_post) { + None => Existing::None, + Some(s) if s.finalized => Existing::Finalized, + Some(s) => { + // Remember the latest full text so the idle reaper can rebuild a + // static card at finalize (fixes streaming-mode table rendering). + s.last_text = text.clone(); + Existing::Active { + card_id: s.card_id.clone(), + seq: s.next_sequence(), + } + } + } + }; + + match existing { + // Idle reaper already closed the stream; content is complete. Further + // cosmetic edits are no-ops that still "succeed" (core keeps om_post). + Existing::Finalized => { + emit_response(event_tx, &reply.request_id, true, Some(om_post), None); + } + Existing::Active { card_id, seq } => { + let token = match adapter.token_cache.get_token(&adapter.client).await { + Ok(t) => t, + Err(e) => { + emit_response( + event_tx, + &reply.request_id, + false, + None, + Some(format!("token error: {e}")), + ); + return; + } + }; + let outcome = feishu_card::update_card_stream( + &adapter.client, + &adapter.config.api_base(), + &token, + &card_id, + feishu_card::STREAM_ELEMENT_ID, + text, + seq, + ) + .await; + match outcome { + // Success: the post→card swap stays invisible — core keeps om_post. + feishu_card::CardOutcome::Updated => { + emit_response(event_tx, &reply.request_id, true, Some(om_post), None); + } + // Rate limited: skip this frame. Every update sends the FULL text, + // so the next flush carries the latest content and nothing is lost. + // Report success to keep core moving. + feishu_card::CardOutcome::RateLimited => { + emit_response(event_tx, &reply.request_id, true, Some(om_post), None); + } + // Hard failure: drop the session, clean the half card (if fallback + // is enabled), and report failure so core's finalize path delivers + // the full reply as a fresh message (reusing #1122 recovery). + feishu_card::CardOutcome::Failed { code, message } => { + let card_msg = { + let mut reg = adapter + .stream_sessions + .lock(); + reg.remove(&om_post).map(|s| s.card_message_id) + }; + if adapter.config.card_fallback_to_post { + if let Some(cmid) = card_msg { + let _ = delete_feishu_message(adapter, &cmid).await; + } + } + tracing::warn!(code, msg = %message, "feishu card update failed; dropping session"); + emit_response( + event_tx, + &reply.request_id, + false, + None, + Some(format!("card_update_failed: {code}")), + ); + } + } + } + Existing::None => { + if should_use_card(text, mode, adapter.config.card_promote_bytes) { + promote_and_respond(reply, adapter, event_tx).await; + return; + } + // Post path: today's behavior + #1122 cap recovery. + match edit_feishu_message(adapter, &om_post, text).await { + EditOutcome::Edited => { + emit_response(event_tx, &reply.request_id, true, Some(om_post), None); + } + EditOutcome::CapReached => { + if mode != StreamingMode::Post { + // Long plain text hit the 18-edit cap with no fence/table: + // treat the cap signal as "long enough" → promote to card. + promote_and_respond(reply, adapter, event_tx).await; + } else { + emit_response( + event_tx, + &reply.request_id, + false, + None, + Some("edit_cap_reached".into()), + ); + } + } + EditOutcome::Failed(err) => { + emit_response(event_tx, &reply.request_id, false, None, Some(err)); + } + } + } + } +} + +/// Promote a streaming reply from post to a CardKit card: create + send the +/// card, delete the old post placeholder, register the session. Order matters — +/// the placeholder is deleted only AFTER the card is confirmed sent, so a +/// create/send failure leaves the post intact for a clean post-edit fallback. +async fn promote_and_respond( + reply: &GatewayReply, + adapter: &FeishuAdapter, + event_tx: &tokio::sync::broadcast::Sender, +) { + let om_post = reply.reply_to.clone(); + let text = &reply.content.text; + let api_base = adapter.config.api_base(); + let token = match adapter.token_cache.get_token(&adapter.client).await { + Ok(t) => t, + Err(e) => { + emit_response( + event_tx, + &reply.request_id, + false, + None, + Some(format!("token error: {e}")), + ); + return; + } + }; + + // 1. Create the card entity seeded with the current full text. + let card_id = match feishu_card::create_streaming_card(&adapter.client, &api_base, &token, text) + .await + { + Ok(id) => id, + Err(outcome) => { + tracing::warn!(?outcome, "feishu card promote: create failed; post-edit fallback"); + return promote_fallback_post(reply, adapter, event_tx).await; + } + }; + + // 2. Send the interactive card message into the same thread. + let reply_target = reply + .quote_message_id + .as_deref() + .or(reply.channel.thread_id.as_deref()); + let card_msg_id = match feishu_card::send_card_message( + &adapter.client, + &api_base, + &token, + &reply.channel.id, + reply_target, + &card_id, + ) + .await + { + Some(id) => id, + None => { + tracing::warn!("feishu card promote: send failed; post-edit fallback"); + return promote_fallback_post(reply, adapter, event_tx).await; + } + }; + + // 3. Delete the old post placeholder (best effort; shape pre-validated). + let _ = delete_feishu_message(adapter, &om_post).await; + + // 4. Register the session keyed by om_post (core stays oblivious). Seed + // last_text with the current full text for the finalize rebuild. + { + let mut reg = adapter + .stream_sessions + .lock(); + reg.promote(&om_post, card_id, card_msg_id, text.to_string()); + } + tracing::info!(om_post = %om_post, "feishu reply promoted post → card"); + + // 5. Core keeps editing om_post. + emit_response(event_tx, &reply.request_id, true, Some(om_post), None); +} + +/// Fallback when promotion fails: edit the (still-present) post placeholder as +/// today. Always emits a GatewayResponse. +async fn promote_fallback_post( + reply: &GatewayReply, + adapter: &FeishuAdapter, + event_tx: &tokio::sync::broadcast::Sender, +) { + let om_post = reply.reply_to.clone(); + match edit_feishu_message(adapter, &om_post, &reply.content.text).await { + EditOutcome::Edited => { + emit_response(event_tx, &reply.request_id, true, Some(om_post), None) + } + EditOutcome::CapReached => emit_response( + event_tx, + &reply.request_id, + false, + None, + Some("edit_cap_reached".into()), + ), + EditOutcome::Failed(err) => { + emit_response(event_tx, &reply.request_id, false, None, Some(err)) + } + } +} + +/// In card mode, send the very first reply straight as a card — no post +/// placeholder, no "message revoked" notice. The session is keyed by the +/// card's own message_id (the id core will edit), so there is no post→card +/// swap at all. Returns true if the card was sent and a response emitted; +/// false if it failed and the caller should fall back to the post path. +async fn try_send_initial_card( + reply: &GatewayReply, + adapter: &FeishuAdapter, + event_tx: &tokio::sync::broadcast::Sender, + token: &str, + api_base: &str, +) -> bool { + let text = &reply.content.text; + let card_id = + match feishu_card::create_streaming_card(&adapter.client, api_base, token, text).await { + Ok(id) => id, + Err(outcome) => { + tracing::warn!(?outcome, "feishu initial card create failed; post fallback"); + return false; + } + }; + let reply_target = reply + .quote_message_id + .as_deref() + .or(reply.channel.thread_id.as_deref()); + let card_msg_id = match feishu_card::send_card_message( + &adapter.client, + api_base, + token, + &reply.channel.id, + reply_target, + &card_id, + ) + .await + { + Some(id) => id, + None => { + tracing::warn!("feishu initial card send failed; post fallback"); + return false; + } + }; + // Session keyed by the card's own message_id — core edits this id directly, + // so subsequent edit_message calls land on the Active path with no swap. + { + let mut reg = adapter + .stream_sessions + .lock(); + reg.promote(&card_msg_id, card_id, card_msg_id.clone(), text.to_string()); + } + // Mirror the post path's self-echo dedupe + thread participation tracking. + adapter.dedupe.is_duplicate(&card_msg_id); + if let Some(tid) = reply.channel.thread_id.as_deref() { + record_participation( + &adapter.participated_threads, + tid, + adapter.config.session_ttl_secs, + ); + } + tracing::info!(card_msg_id = %card_msg_id, "feishu first reply sent directly as card (no placeholder)"); + emit_response(event_tx, &reply.request_id, true, Some(card_msg_id), None); + true +} + +/// Background idle-finalize reaper for card streaming. Periodically scans for +/// sessions idle ≥ `idle_ms`, rebuilds each as a STATIC card via +/// `finish_card_stream` (so the typewriter cursor settles and markdown +/// re-renders), and marks them finalized. Finalized sessions are kept (not +/// removed) so a late cosmetic edit is a clean no-op; FIFO eviction reclaims +/// them eventually. Spawned once at startup (main.rs), only when +/// `streaming_mode != post`. +pub async fn run_idle_reaper( + stream_sessions: Arc>, + token_cache: Arc, + client: reqwest::Client, + api_base: String, + idle_ms: u64, +) { + let tick = std::time::Duration::from_millis(idle_ms.clamp(250, 1000)); + loop { + tokio::time::sleep(tick).await; + let keys = { + let reg = stream_sessions.lock(); + reg.idle_keys(idle_ms) + }; + for key in keys { + // Mark finalized + grab (card_id, next seq) under the lock so two + // reaper ticks can't double-finalize the same session. + let target = { + let mut reg = stream_sessions.lock(); + match reg.get_mut(&key) { + Some(s) if !s.finalized => { + s.mark_finalized(); + Some((s.card_id.clone(), s.last_text.clone(), s.next_sequence())) + } + _ => None, + } + }; + let Some((card_id, text, seq)) = target else { + continue; + }; + let token = match token_cache.get_token(&client).await { + Ok(t) => t, + Err(e) => { + tracing::warn!(err = %e, "feishu reaper: token error, skip finalize"); + continue; + } + }; + match feishu_card::finish_card_stream(&client, &api_base, &token, &card_id, &text, seq) + .await + { + feishu_card::CardOutcome::Updated => { + tracing::info!(card_id = %card_id, "feishu card stream finalized (idle)"); + } + other => { + tracing::warn!(card_id = %card_id, ?other, "feishu card finalize failed"); + } + } + } + } +} + /// Split text into chunks of at most `limit` bytes, breaking at newline or /// space boundaries when possible. Safe for multi-byte UTF-8 (e.g. Chinese). fn split_text(text: &str, limit: usize) -> Vec<&str> { @@ -2693,7 +3194,7 @@ const WEBHOOK_BODY_LIMIT: usize = 1_048_576; /// Simple per-IP rate limiter state. pub struct RateLimiter { - counts: std::sync::Mutex>, + counts: parking_lot::Mutex>, window_secs: u64, max_requests: u64, } @@ -2701,7 +3202,7 @@ pub struct RateLimiter { impl RateLimiter { pub fn new(window_secs: u64, max_requests: u64) -> Self { Self { - counts: std::sync::Mutex::new(HashMap::new()), + counts: parking_lot::Mutex::new(HashMap::new()), window_secs, max_requests, } @@ -2709,7 +3210,7 @@ impl RateLimiter { /// Returns true if the request should be rejected (rate exceeded). pub fn check(&self, key: &str) -> bool { - let mut map = self.counts.lock().unwrap_or_else(|e| e.into_inner()); + let mut map = self.counts.lock(); // Lazy cleanup if map.len() > 4096 { map.retain(|_, (_, ts)| ts.elapsed().as_secs() < self.window_secs); @@ -3005,10 +3506,114 @@ mod tests { dedupe_ttl_secs: 300, message_limit: 4000, session_ttl_secs: 86400, + streaming_mode: StreamingMode::Post, + card_fallback_to_post: true, + card_promote_bytes: 4000, + card_idle_finalize_ms: 3000, api_base_override: None, } } + // --- Streaming mode config tests (S1) --- + + #[test] + fn streaming_mode_default_is_auto() { + // S6 switched the feature on: the type default is now Auto, so an + // unset env var enables card streaming in auto mode. + assert_eq!(StreamingMode::default(), StreamingMode::Auto); + } + + #[test] + fn streaming_mode_parse() { + // Empty → Auto, matching Default::default() (unset env var == empty env var). + assert_eq!(StreamingMode::parse(""), StreamingMode::Auto); + assert_eq!(StreamingMode::parse("auto"), StreamingMode::Auto); + assert_eq!(StreamingMode::parse("AUTO"), StreamingMode::Auto); + assert_eq!(StreamingMode::parse("post"), StreamingMode::Post); + assert_eq!(StreamingMode::parse("POST"), StreamingMode::Post); + // Unknown / garbage values must not silently enable the feature. + assert_eq!(StreamingMode::parse("garbage"), StreamingMode::Post); + assert_eq!(StreamingMode::parse("on"), StreamingMode::Post); + } + + #[test] + fn streaming_mode_parse_card_and_auto() { + assert_eq!(StreamingMode::parse("card"), StreamingMode::Card); + assert_eq!(StreamingMode::parse("CARD"), StreamingMode::Card); + assert_eq!(StreamingMode::parse("auto"), StreamingMode::Auto); + assert_eq!(StreamingMode::parse("Auto"), StreamingMode::Auto); + // Surrounding whitespace is tolerated (env values often carry it). + assert_eq!(StreamingMode::parse(" card "), StreamingMode::Card); + assert_eq!(StreamingMode::parse(" auto "), StreamingMode::Auto); + } + + // --- should_use_card tests (S2) --- + + #[test] + fn should_use_card_post_mode_never_promotes() { + // Post mode is today's behavior: nothing ever promotes, regardless of + // size, fences, or tables. + let big = "x".repeat(10_000); + assert!(!should_use_card(&big, StreamingMode::Post, 4000)); + assert!(!should_use_card( + "```rust\nfn main() {}\n```", + StreamingMode::Post, + 4000 + )); + assert!(!should_use_card( + "| a | b |\n| --- | --- |\n| 1 | 2 |", + StreamingMode::Post, + 4000 + )); + } + + #[test] + fn should_use_card_card_mode_always_promotes() { + assert!(should_use_card("hi", StreamingMode::Card, 4000)); + assert!(should_use_card("", StreamingMode::Card, 4000)); + } + + #[test] + fn should_use_card_auto_promotes_on_length() { + // At/over threshold promotes; just under stays post. + let at = "x".repeat(4000); + assert!(should_use_card(&at, StreamingMode::Auto, 4000)); + let under = "x".repeat(3999); + assert!(!should_use_card(&under, StreamingMode::Auto, 4000)); + } + + #[test] + fn should_use_card_auto_promotes_on_code_fence() { + assert!(should_use_card("here:\n```\ncode\n```", StreamingMode::Auto, 4000)); + assert!(should_use_card("```python\nprint(1)\n```", StreamingMode::Auto, 4000)); + // Indented fence (markdown_to_post trims leading whitespace too). + assert!(should_use_card(" ```\ncode", StreamingMode::Auto, 4000)); + // Inline single backticks are NOT a fence. + assert!(!should_use_card("use `cargo` to build", StreamingMode::Auto, 4000)); + } + + #[test] + fn should_use_card_auto_promotes_on_table() { + let table = "| Name | Age |\n| --- | --- |\n| Bob | 30 |"; + assert!(should_use_card(table, StreamingMode::Auto, 4000)); + // Colon-aligned delimiter row. + let aligned = "| a | b |\n| :--- | ---: |\n| 1 | 2 |"; + assert!(should_use_card(aligned, StreamingMode::Auto, 4000)); + // A bare delimiter row alone is enough signal to promote. + assert!(should_use_card("|---|---|", StreamingMode::Auto, 4000)); + } + + #[test] + fn should_use_card_auto_plain_short_text_stays_post() { + assert!(!should_use_card("just a short reply", StreamingMode::Auto, 4000)); + // A line with a pipe but no delimiter row is not a table. + assert!(!should_use_card("a | b but not a table", StreamingMode::Auto, 4000)); + // Bullet list with dashes is not a table (no pipe). + assert!(!should_use_card("- item one\n- item two", StreamingMode::Auto, 4000)); + // Thematic break (---) is not a table (no pipe). + assert!(!should_use_card("above\n\n---\n\nbelow", StreamingMode::Auto, 4000)); + } + // --- Token tests --- #[tokio::test] @@ -3376,7 +3981,7 @@ mod tests { let config = test_config(); let token_cache = Arc::new(FeishuTokenCache::with_base(&config, &server.uri())); - let name_cache = Arc::new(std::sync::Mutex::new(HashMap::new())); + let name_cache = Arc::new(parking_lot::Mutex::new(HashMap::new())); let client = reqwest::Client::new(); let name = resolve_user_name("ou_user1", &name_cache, &token_cache, &client, &server.uri()).await; @@ -3407,7 +4012,7 @@ mod tests { let config = test_config(); let token_cache = Arc::new(FeishuTokenCache::with_base(&config, &server.uri())); - let name_cache = Arc::new(std::sync::Mutex::new(HashMap::new())); + let name_cache = Arc::new(parking_lot::Mutex::new(HashMap::new())); let client = reqwest::Client::new(); let name = resolve_user_name("ou_unknown", &name_cache, &token_cache, &client, &server.uri()).await; @@ -3585,16 +4190,16 @@ mod tests { #[test] fn record_participation_and_eviction() { - let cache = Arc::new(std::sync::Mutex::new(HashMap::new())); + let cache = Arc::new(parking_lot::Mutex::new(HashMap::new())); // Record a thread record_participation(&cache, "thread_1", 86400); - assert_eq!(cache.lock().unwrap().len(), 1); + assert_eq!(cache.lock().len(), 1); // Fill beyond PARTICIPATION_CACHE_MAX for i in 0..PARTICIPATION_CACHE_MAX + 10 { record_participation(&cache, &format!("thread_{i}"), 86400); } // After eviction, should be roughly half - assert!(cache.lock().unwrap().len() <= PARTICIPATION_CACHE_MAX); + assert!(cache.lock().len() <= PARTICIPATION_CACHE_MAX); } // --- Multibot-mentions mode tests --- @@ -3777,8 +4382,8 @@ mod tests { // --- Edit-cap helpers (F3/F4/F8/F10): no network required --- - fn fresh_cache() -> Arc> { - Arc::new(std::sync::Mutex::new(EditCountsCache::default())) + fn fresh_cache() -> Arc> { + Arc::new(parking_lot::Mutex::new(EditCountsCache::default())) } #[test] @@ -3847,7 +4452,7 @@ mod tests { mark_edit_cap(&cache, "om_msg1"); increment_edit_count(&cache, "om_msg1"); // Increment must not push past u32::MAX sentinel. - let map = cache.lock().unwrap(); + let map = cache.lock(); assert_eq!(map.counts.get("om_msg1").copied(), Some(u32::MAX)); } @@ -3868,7 +4473,7 @@ mod tests { // survive. increment_edit_count(&cache, "om_active_recent"); - let map = cache.lock().unwrap(); + let map = cache.lock(); // FIFO eviction: the newest insert must still be present. assert!( map.counts.contains_key("om_active_recent"), @@ -3985,7 +4590,7 @@ mod tests { matches!(outcome, EditOutcome::Edited), "HTTP 200 + code 0 must yield Edited" ); - let map = adapter.edit_counts.lock().unwrap(); + let map = adapter.edit_counts.lock(); assert_eq!(map.counts.get("om_ok").copied(), Some(1)); } @@ -4013,7 +4618,7 @@ mod tests { "HTTP 200 + code 99991 must yield Failed, not Edited" ); // Failure must NOT increment the edit count. - let map = adapter.edit_counts.lock().unwrap(); + let map = adapter.edit_counts.lock(); assert_eq!(map.counts.get("om_err").copied(), None); } @@ -4101,4 +4706,273 @@ mod tests { let reason = att.status.unwrap(); assert!(reason.contains("unsupported format"), "got: {reason}"); } + + // --- Card streaming wiring (S5) --- + + fn token_mock() -> Mock { + Mock::given(method("POST")) + .and(path("/open-apis/auth/v3/tenant_access_token/internal")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, "tenant_access_token": "t-test", "expire": 7200 + }))) + } + + fn edit_reply( + reply_to: &str, + text: &str, + thread_id: Option<&str>, + request_id: Option<&str>, + ) -> crate::schema::GatewayReply { + crate::schema::GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: reply_to.into(), + platform: "feishu".into(), + channel: crate::schema::ReplyChannel { + id: "oc_chat".into(), + thread_id: thread_id.map(|s| s.into()), + }, + content: crate::schema::Content { + content_type: "text".into(), + text: text.into(), + attachments: vec![], + }, + command: Some("edit_message".into()), + request_id: request_id.map(|s| s.into()), + quote_message_id: None, + } + } + + /// Post mode (default) must keep today's behavior: PATCH the post in place, + /// never create a card session. + #[tokio::test] + async fn s5_edit_post_mode_patches_without_session() { + let server = MockServer::start().await; + token_mock().mount(&server).await; + Mock::given(method("PUT")) + .and(path("/open-apis/im/v1/messages/om_ph1")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"code": 0}))) + .expect(1) + .mount(&server) + .await; + let mut config = test_config(); + config.api_base_override = Some(server.uri()); + config.streaming_mode = StreamingMode::Post; + let adapter = FeishuAdapter::new(config); + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + + handle_reply(&edit_reply("om_ph1", "short reply", None, Some("r1")), &adapter, &tx).await; + + assert!( + adapter.stream_sessions.lock().is_empty(), + "post mode must not create a session" + ); + let resp: crate::schema::GatewayResponse = + serde_json::from_str(&rx.try_recv().unwrap()).unwrap(); + assert!(resp.success); + assert_eq!(resp.message_id.as_deref(), Some("om_ph1")); + } + + /// Card mode: the first edit promotes post→card (create + send + delete old + /// placeholder) and registers a session keyed by the placeholder id. + #[tokio::test] + async fn s5_edit_card_mode_promotes_and_registers_session() { + let server = MockServer::start().await; + token_mock().mount(&server).await; + Mock::given(method("POST")) + .and(path("/open-apis/cardkit/v1/cards")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, "data": {"card_id": "card777"} + }))) + .expect(1) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/open-apis/im/v1/messages/om_root/reply")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, "data": {"message_id": "om_cardmsg"} + }))) + .expect(1) + .mount(&server) + .await; + Mock::given(method("DELETE")) + .and(path("/open-apis/im/v1/messages/om_ph2")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"code": 0}))) + .expect(1) + .mount(&server) + .await; + let mut config = test_config(); + config.api_base_override = Some(server.uri()); + config.streaming_mode = StreamingMode::Card; + let adapter = FeishuAdapter::new(config); + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + + handle_reply( + &edit_reply("om_ph2", "promote me", Some("om_root"), Some("r2")), + &adapter, + &tx, + ) + .await; + + { + let reg = adapter.stream_sessions.lock(); + let s = reg.get("om_ph2").expect("session created on promote"); + assert_eq!(s.card_id, "card777"); + assert_eq!(s.card_message_id, "om_cardmsg"); + } + let resp: crate::schema::GatewayResponse = + serde_json::from_str(&rx.try_recv().unwrap()).unwrap(); + assert!(resp.success); + // The swap is invisible to core: it keeps editing the placeholder id. + assert_eq!(resp.message_id.as_deref(), Some("om_ph2")); + } + + /// An existing (promoted) session streams the FULL accumulated text plus a + /// strictly increasing sequence — the #565 guard at the wiring level. + #[tokio::test] + async fn s5_edit_existing_session_streams_full_content() { + let server = MockServer::start().await; + token_mock().mount(&server).await; + Mock::given(method("PUT")) + .and(path( + "/open-apis/cardkit/v1/cards/card777/elements/md_stream/content", + )) + // Exact body match: full content (not a delta) + sequence 1. + .and(body_json(serde_json::json!({ + "content": "full snapshot v2", + "sequence": 1 + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"code": 0}))) + .expect(1) + .mount(&server) + .await; + let mut config = test_config(); + config.api_base_override = Some(server.uri()); + config.streaming_mode = StreamingMode::Card; + let adapter = FeishuAdapter::new(config); + adapter.stream_sessions.lock().promote( + "om_ph3", + "card777".into(), + "om_cardmsg".into(), + "seed".into(), + ); + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + + handle_reply( + &edit_reply("om_ph3", "full snapshot v2", None, Some("r3")), + &adapter, + &tx, + ) + .await; + + let resp: crate::schema::GatewayResponse = + serde_json::from_str(&rx.try_recv().unwrap()).unwrap(); + assert!(resp.success); + assert_eq!(resp.message_id.as_deref(), Some("om_ph3")); + } + + /// A hard card-update failure drops the session (and cleans the half card) + /// so core's finalize path delivers the full reply as a fresh message — + /// the #63-style cleanup path. + #[tokio::test] + async fn s5_edit_card_failure_drops_session() { + let server = MockServer::start().await; + token_mock().mount(&server).await; + Mock::given(method("PUT")) + .and(path( + "/open-apis/cardkit/v1/cards/card_x/elements/md_stream/content", + )) + .respond_with(ResponseTemplate::new(400).set_body_json(serde_json::json!({ + "code": 300317, "msg": "sequence did not increment" + }))) + .expect(1) + .mount(&server) + .await; + // fallback_to_post = true (default) → clean the half card. + Mock::given(method("DELETE")) + .and(path("/open-apis/im/v1/messages/om_cardmsg")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({"code": 0}))) + .expect(1) + .mount(&server) + .await; + let mut config = test_config(); + config.api_base_override = Some(server.uri()); + config.streaming_mode = StreamingMode::Card; + let adapter = FeishuAdapter::new(config); + adapter.stream_sessions.lock().promote( + "om_ph4", + "card_x".into(), + "om_cardmsg".into(), + "seed".into(), + ); + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + + handle_reply(&edit_reply("om_ph4", "v", None, Some("r4")), &adapter, &tx).await; + + assert!( + !adapter.stream_sessions.lock().contains("om_ph4"), + "failed card update must drop the session" + ); + let resp: crate::schema::GatewayResponse = + serde_json::from_str(&rx.try_recv().unwrap()).unwrap(); + assert!(!resp.success, "failure must surface so core finalize takes over"); + } + + /// Card mode: the FIRST reply (command=None) goes straight to a card with no + /// post placeholder and no revoke. Session is keyed by the card's own id. + #[tokio::test] + async fn s5_card_mode_first_send_is_card_no_placeholder() { + let server = MockServer::start().await; + token_mock().mount(&server).await; + Mock::given(method("POST")) + .and(path("/open-apis/cardkit/v1/cards")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, "data": {"card_id": "cardZ"} + }))) + .expect(1) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/open-apis/im/v1/messages")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, "data": {"message_id": "om_cardZ"} + }))) + .expect(1) + .mount(&server) + .await; + let mut config = test_config(); + config.api_base_override = Some(server.uri()); + config.streaming_mode = StreamingMode::Card; + let adapter = FeishuAdapter::new(config); + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let reply = crate::schema::GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_1".into(), + platform: "feishu".into(), + channel: crate::schema::ReplyChannel { + id: "oc_chat".into(), + thread_id: None, + }, + content: crate::schema::Content { + content_type: "text".into(), + text: "first chunk".into(), + attachments: vec![], + }, + command: None, + request_id: Some("r1".into()), + quote_message_id: None, + }; + + handle_reply(&reply, &adapter, &tx).await; + + // Session keyed by the card's own message_id (no post→card swap). + { + let reg = adapter.stream_sessions.lock(); + let s = reg.get("om_cardZ").expect("session keyed by card message id"); + assert_eq!(s.card_id, "cardZ"); + } + let resp: crate::schema::GatewayResponse = + serde_json::from_str(&rx.try_recv().unwrap()).unwrap(); + assert!(resp.success); + assert_eq!(resp.message_id.as_deref(), Some("om_cardZ")); + } } diff --git a/gateway/src/adapters/feishu_card.rs b/gateway/src/adapters/feishu_card.rs new file mode 100644 index 000000000..9c5577644 --- /dev/null +++ b/gateway/src/adapters/feishu_card.rs @@ -0,0 +1,1171 @@ +//! Feishu CardKit v2 streaming card client. +//! +//! Thin reqwest wrapper around the Feishu CardKit v1 REST API +//! (`/open-apis/cardkit/v1/*`), used to drive streaming replies that are free +//! of the `PATCH /im/v1/messages` 20-edit cap (errcode 230072) and render +//! markdown / tables natively (Issue #1124). +//! +//! Endpoint shapes are taken from the official docs: +//! - Create card entity: POST `/open-apis/cardkit/v1/cards` +//! - Stream text update: PUT `/open-apis/cardkit/v1/cards/:card_id/elements/:element_id/content` +//! - Update card config: PATCH `/open-apis/cardkit/v1/cards/:card_id/settings` +//! - Send card message: POST `/open-apis/im/v1/messages[/:id/reply]` (msg_type=interactive) +//! +//! Design notes / pitfalls baked in: +//! - The stream-text API takes the FULL accumulated text, NOT a delta. Feishu +//! renders a typewriter effect only when the new text has the old text as a +//! prefix; otherwise it replaces wholesale. Passing deltas is the +//! openclaw-lark #565 bug — guarded by `update_card_stream_*` tests. +//! - `sequence` must STRICTLY increase across every operation on one card +//! (errcode 300317 otherwise). The counter is owned by the session (S4); +//! this module just forwards whatever sequence it is given. +//! - A card entity can be SENT only once and expires after 14 days; the +//! session (S4/S5) creates a fresh entity per streaming reply. +//! - `streaming_mode` must be on (set at create time) for the content API to +//! work (errcode 300309 / 200850 otherwise); finalize turns it back off. +//! +// Most of this module is wired in as of S5 (REST client + session registry + +// idle reaper). A few convenience APIs remain exercised only by tests or +// reserved for phase two (card splitting): `FeishuStreamRegistry::{get, +// contains, len, is_empty}`. + +use serde_json::Value; +use std::borrow::Cow; +use std::collections::{HashMap, VecDeque}; +use std::time::Instant; + +/// Element ID of the single streaming markdown element in our MVP card. +/// Must match between `markdown_to_card_v2` (create) and `update_card_stream`. +pub const STREAM_ELEMENT_ID: &str = "md_stream"; + +/// Outcome of a CardKit streaming operation, classified for the S5 caller. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CardOutcome { + /// Operation applied (HTTP 2xx + code 0). + Updated, + /// Rate limited (HTTP 429). Caller may skip this frame and retry later. + RateLimited, + /// Any other failure: HTTP error status, non-zero errcode, or a transport + /// / body-parse error. Caller should fall back to the post path. Carries + /// the Feishu errcode (0 when not a structured API error) plus a short + /// message for logging. + Failed { code: i64, message: String }, +} + +// --------------------------------------------------------------------------- +// Redundant table-fence stripping (E2E fix, 2026-06-19) +// +// Agents often wrap a GFM table in a bare ``` fence to get monospace alignment +// in environments that DON'T render tables (Discord, terminals). Feishu cards +// DO render tables, so on the card path we unwrap a fence whose body is exactly +// one complete GFM table — platform-correct adaptation, the adapter's job. +// Strict guards keep false positives/negatives near zero; never drops bytes. +// --------------------------------------------------------------------------- + +/// A bare code fence line: trimmed content is exactly three backticks (no +/// language tag). Tagged fences (e.g. ```rust) are never treated as bare. +fn is_bare_fence(line: &str) -> bool { + line.trim() == "```" +} + +/// Index of the next bare fence at or after `start` (the closing fence). +fn find_closing_fence(lines: &[&str], start: usize) -> Option { + (start..lines.len()).find(|&j| is_bare_fence(lines[j])) +} + +/// Trim leading/trailing blank lines; return the `[start, end)` bounds. +fn trim_blank_bounds(lines: &[&str]) -> (usize, usize) { + let mut s = 0; + let mut e = lines.len(); + while s < e && lines[s].trim().is_empty() { + s += 1; + } + while e > s && lines[e - 1].trim().is_empty() { + e -= 1; + } + (s, e) +} + +/// Count GFM table columns in a row (outer pipes stripped before splitting). +fn count_table_cols(line: &str) -> usize { + let t = line.trim(); + let t = t.strip_prefix('|').unwrap_or(t); + let t = t.strip_suffix('|').unwrap_or(t); + t.split('|').count() +} + +/// A GFM delimiter cell: optional leading `:`, one-or-more `-`, optional +/// trailing `:` (matches `^:?-+:?$` on the trimmed cell). +fn is_delim_cell(cell: &str) -> bool { + let c = cell.trim().as_bytes(); + if c.is_empty() { + return false; + } + let mut i = 0; + if c[i] == b':' { + i += 1; + } + let dash_start = i; + while i < c.len() && c[i] == b'-' { + i += 1; + } + if i == dash_start { + return false; // need at least one dash + } + if i < c.len() && c[i] == b':' { + i += 1; + } + i == c.len() +} + +/// True if `body` (the lines between fences) is EXACTLY one well-formed GFM +/// table and nothing else: a header row with pipes, a delimiter row whose +/// column count matches and whose cells are all valid delimiters, and ≥1 data +/// row. Blank lines are tolerated only at the leading/trailing edges; any +/// interior blank line or non-table line disqualifies it. +fn is_single_gfm_table(body: &[&str]) -> bool { + let (s, e) = trim_blank_bounds(body); + let rows = &body[s..e]; + if rows.len() < 3 { + return false; // header + delimiter + at least one data row + } + if rows.iter().any(|l| l.trim().is_empty()) { + return false; // no interior blank lines + } + if !rows[0].contains('|') { + return false; // header must have a pipe + } + let cols = count_table_cols(rows[0]); + if cols == 0 { + return false; + } + let delim = rows[1].trim(); + if !delim.contains('|') || !delim.contains('-') || count_table_cols(rows[1]) != cols { + return false; + } + let dinner = delim.strip_prefix('|').unwrap_or(delim); + let dinner = dinner.strip_suffix('|').unwrap_or(dinner); + if !dinner.split('|').all(is_delim_cell) { + return false; + } + // Every data row must contain a pipe. + rows[2..].iter().all(|l| l.contains('|')) +} + +/// Unwrap any bare code fence whose body is exactly one GFM table, so Feishu +/// renders it as a native table instead of a code block. Only CLOSED, untagged +/// fences qualify — an unclosed fence (mid-stream) and any non-table body are +/// left untouched, so streaming snapshots are stable and never flicker. +/// Returns `Cow::Borrowed` unchanged when nothing matches (never drops bytes). +fn strip_redundant_table_fence(text: &str) -> Cow<'_, str> { + if !text.contains("```") { + return Cow::Borrowed(text); + } + let lines: Vec<&str> = text.split('\n').collect(); + let mut out: Vec<&str> = Vec::with_capacity(lines.len()); + let mut changed = false; + let mut i = 0; + while i < lines.len() { + if is_bare_fence(lines[i]) { + if let Some(close) = find_closing_fence(&lines, i + 1) { + let body = &lines[i + 1..close]; + if is_single_gfm_table(body) { + let (s, e) = trim_blank_bounds(body); + out.extend_from_slice(&body[s..e]); + changed = true; + } else { + // Not a lone table: keep the whole fenced block verbatim + // (including both fence lines) so nothing is altered. + out.extend_from_slice(&lines[i..=close]); + } + i = close + 1; + continue; + } + // Unclosed fence (mid-stream): leave the rest as-is. + } + out.push(lines[i]); + i += 1; + } + if changed { + Cow::Owned(out.join("\n")) + } else { + Cow::Borrowed(text) + } +} + +/// Build a CardKit JSON 2.0 card holding a single markdown element. +/// +/// - `schema` is pinned to `"2.0"` (the only structure the API accepts). +/// - `streaming` toggles `streaming_mode`: `true` for the live typewriter card, +/// `false` for the finalized static card. +/// - The text is first passed through `strip_redundant_table_fence` so a +/// table wrapped in a bare ``` fence renders as a native table. +/// - `update_multi` is left at its default (`true`); setting it `false` is +/// rejected in streaming mode (errcode 300302), so we never emit it. +pub fn markdown_to_card_v2(text: &str, streaming: bool) -> Value { + let text = strip_redundant_table_fence(text); + let text = text.as_ref(); + let mut config = serde_json::json!({ "streaming_mode": streaming }); + if streaming { + config["streaming_config"] = serde_json::json!({ "print_strategy": "fast" }); + } + serde_json::json!({ + "schema": "2.0", + "config": config, + "body": { + "elements": [ + { + "tag": "markdown", + "content": text, + "element_id": STREAM_ELEMENT_ID + } + ] + } + }) +} + +/// Classify a CardKit (PUT/PATCH) response into a `CardOutcome`. +async fn classify(resp: reqwest::Response, op: &'static str) -> CardOutcome { + let status = resp.status(); + if status.as_u16() == 429 { + tracing::warn!(op, "feishu cardkit rate limited (429)"); + return CardOutcome::RateLimited; + } + let body: Value = match resp.json().await { + Ok(v) => v, + Err(e) => { + return CardOutcome::Failed { + code: 0, + message: format!("{op}: bad response body: {e}"), + }; + } + }; + let code = body.get("code").and_then(|c| c.as_i64()).unwrap_or(-1); + if status.is_success() && code == 0 { + CardOutcome::Updated + } else { + let msg = body + .get("msg") + .and_then(|m| m.as_str()) + .unwrap_or("") + .to_string(); + tracing::warn!(op, %status, code, msg = %msg, "feishu cardkit op failed"); + CardOutcome::Failed { code, message: msg } + } +} + +/// Create a streaming card entity seeded with the initial (full) text. +/// Returns the new `card_id` on success, or a `CardOutcome` describing the +/// failure (so the caller can decide between fallback and skip). +pub async fn create_streaming_card( + client: &reqwest::Client, + api_base: &str, + token: &str, + text: &str, +) -> Result { + let card = markdown_to_card_v2(text, true); + let body = serde_json::json!({ + "type": "card_json", + "data": card.to_string(), + }); + let url = format!("{api_base}/open-apis/cardkit/v1/cards"); + let resp = match client + .post(&url) + .bearer_auth(token) + .header("Content-Type", "application/json; charset=utf-8") + .json(&body) + .send() + .await + { + Ok(r) => r, + Err(e) => { + return Err(CardOutcome::Failed { + code: 0, + message: format!("create: request error: {e}"), + }) + } + }; + let status = resp.status(); + if status.as_u16() == 429 { + return Err(CardOutcome::RateLimited); + } + let rb: Value = match resp.json().await { + Ok(v) => v, + Err(e) => { + return Err(CardOutcome::Failed { + code: 0, + message: format!("create: bad response body: {e}"), + }) + } + }; + let code = rb.get("code").and_then(|c| c.as_i64()).unwrap_or(-1); + if status.is_success() && code == 0 { + if let Some(id) = rb.pointer("/data/card_id").and_then(|v| v.as_str()) { + tracing::info!(card_id = %id, "feishu streaming card created"); + return Ok(id.to_string()); + } + return Err(CardOutcome::Failed { + code: 0, + message: "create: response missing data.card_id".into(), + }); + } + let msg = rb + .get("msg") + .and_then(|m| m.as_str()) + .unwrap_or("") + .to_string(); + tracing::warn!(%status, code, msg = %msg, "feishu create card failed"); + Err(CardOutcome::Failed { code, message: msg }) +} + +/// Send an interactive card message referencing `card_id`. +/// +/// Mirrors `feishu::send_post_message`: uses the reply API when `reply_to` is +/// `Some(root_id)` (stays in-thread), otherwise creates a new message in the +/// chat. Returns the sent `message_id` on success. +pub async fn send_card_message( + client: &reqwest::Client, + api_base: &str, + token: &str, + chat_id: &str, + reply_to: Option<&str>, + card_id: &str, +) -> Option { + let content = serde_json::json!({ + "type": "card", + "data": { "card_id": card_id } + }) + .to_string(); + + let (url, body) = if let Some(root_id) = reply_to { + ( + format!("{api_base}/open-apis/im/v1/messages/{root_id}/reply"), + serde_json::json!({ + "msg_type": "interactive", + "content": content, + }), + ) + } else { + ( + format!("{api_base}/open-apis/im/v1/messages?receive_id_type=chat_id"), + serde_json::json!({ + "receive_id": chat_id, + "msg_type": "interactive", + "content": content, + }), + ) + }; + + match client + .post(&url) + .bearer_auth(token) + .header("Content-Type", "application/json; charset=utf-8") + .json(&body) + .send() + .await + { + Ok(resp) if resp.status().is_success() => { + let rb: Value = resp.json().await.unwrap_or_default(); + let mid = rb + .pointer("/data/message_id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + tracing::info!(chat_id = %chat_id, reply_to = ?reply_to, message_id = ?mid, "feishu card message sent"); + mid + } + Ok(resp) => { + let status = resp.status(); + let t = resp.text().await.unwrap_or_default(); + tracing::error!(status = %status, body = %t, "feishu send card message failed"); + None + } + Err(e) => { + tracing::error!(err = %e, "feishu send card message request failed"); + None + } + } +} + +/// Stream a FULL text snapshot to the card's markdown element. +/// +/// `text` MUST be the cumulative content, never a delta (#565). `sequence` +/// must strictly increase per card across all operations (#317). This +/// function forwards both verbatim and does not accumulate, truncate, or +/// diff — accumulation is the session's job (S5). +pub async fn update_card_stream( + client: &reqwest::Client, + api_base: &str, + token: &str, + card_id: &str, + element_id: &str, + text: &str, + sequence: i64, +) -> CardOutcome { + // Three-exit consistency: unwrap a redundant table fence here too, so a + // closed table fence renders natively mid-stream (no flicker) and matches + // the create/finalize paths. + let text = strip_redundant_table_fence(text); + let url = + format!("{api_base}/open-apis/cardkit/v1/cards/{card_id}/elements/{element_id}/content"); + let body = serde_json::json!({ + "content": text.as_ref(), + "sequence": sequence, + }); + match client + .put(&url) + .bearer_auth(token) + .header("Content-Type", "application/json; charset=utf-8") + .json(&body) + .send() + .await + { + Ok(resp) => classify(resp, "update_content").await, + Err(e) => CardOutcome::Failed { + code: 0, + message: format!("update: request error: {e}"), + }, + } +} + +/// Finalize the stream by REPLACING the card with a STATIC card carrying the +/// final full text (`PUT /cardkit/v1/cards/:id`, full card/update). +/// +/// A full replace forces Feishu to re-parse the markdown statically, which is +/// what fixes GFM tables: during streaming they render as a code block, and a +/// PATCH-settings (streaming_mode=false) alone only stops the cursor without +/// re-rendering — the broken table would persist. Replacing with a static card +/// re-renders the table correctly (E2E finding, 2026-06-19). `sequence` must be +/// greater than the last content update's sequence. +pub async fn finish_card_stream( + client: &reqwest::Client, + api_base: &str, + token: &str, + card_id: &str, + text: &str, + sequence: i64, +) -> CardOutcome { + let card = markdown_to_card_v2(text, false); + let body = serde_json::json!({ + "card": { "type": "card_json", "data": card.to_string() }, + "sequence": sequence, + }); + let url = format!("{api_base}/open-apis/cardkit/v1/cards/{card_id}"); + match client + .put(&url) + .bearer_auth(token) + .header("Content-Type", "application/json; charset=utf-8") + .json(&body) + .send() + .await + { + Ok(resp) => classify(resp, "finish_card_update").await, + Err(e) => CardOutcome::Failed { + code: 0, + message: format!("finish: request error: {e}"), + }, + } +} + +// --------------------------------------------------------------------------- +// Streaming session state (S4): registry + state machine + FIFO eviction +// --------------------------------------------------------------------------- + +/// Max concurrent card-streaming sessions before FIFO eviction kicks in. +const STREAM_SESSIONS_MAX: usize = 1024; + +/// State of one promoted card-streaming reply. +/// +/// A session exists only AFTER a reply has been promoted from the post path to +/// a CardKit card. In the registry it is keyed by the placeholder *post* +/// message_id that core still believes it is editing (`om_post`), so core stays +/// oblivious to the post→card swap — no core or schema change. +pub struct FeishuStreamSession { + /// CardKit card entity id — the streaming target for content updates. + pub card_id: String, + /// message_id of the sent interactive card message (for cleanup / delete). + pub card_message_id: String, + /// Monotonic op counter. Every CardKit op (content update, finalize) must + /// use a STRICTLY increasing sequence, else Feishu rejects it (errcode + /// 300317). The session owns this counter. + pub sequence: i64, + /// Last activity time, for the idle-finalize reaper (S5). + pub last_activity: Instant, + /// True once finalize (streaming_mode off) has been sent, so the reaper + /// does not double-finalize. + pub finalized: bool, + /// Latest full text snapshot pushed to the card. The idle reaper uses it to + /// rebuild a STATIC card at finalize (which fixes streaming-mode tables). + pub last_text: String, +} + +impl FeishuStreamSession { + fn new(card_id: String, card_message_id: String, initial_text: String) -> Self { + Self { + card_id, + card_message_id, + sequence: 0, + last_activity: Instant::now(), + finalized: false, + last_text: initial_text, + } + } + + /// Advance to the next strictly-increasing sequence and refresh activity. + /// Used for every content update and for finalize. + pub fn next_sequence(&mut self) -> i64 { + self.sequence += 1; + self.last_activity = Instant::now(); + self.sequence + } + + /// Mark finalized so the idle reaper won't finalize this session again. + pub fn mark_finalized(&mut self) { + self.finalized = true; + } + + /// Whether this session has been idle for at least `idle_ms` and is not + /// yet finalized — i.e. a finalize candidate for the reaper. + pub fn is_idle(&self, idle_ms: u64) -> bool { + !self.finalized && self.last_activity.elapsed().as_millis() as u64 >= idle_ms + } +} + +/// Registry of active card-streaming sessions, keyed by `om_post`. +/// +/// Insertion-order FIFO eviction mirrors `EditCountsCache` in feishu.rs: the +/// oldest *insertions* age out first, which strongly favours keeping active +/// (recently promoted) streams over stale ones. +#[derive(Default)] +pub struct FeishuStreamRegistry { + sessions: HashMap, + order: VecDeque, +} + +impl FeishuStreamRegistry { + /// Register a freshly-promoted session under its placeholder `om_post` key. + /// Promotion is one-way; if the key somehow already exists it is replaced + /// but keeps its FIFO position (no duplicate `order` entry). + pub fn promote( + &mut self, + om_post: &str, + card_id: String, + card_message_id: String, + initial_text: String, + ) { + let was_new = !self.sessions.contains_key(om_post); + self.sessions.insert( + om_post.to_string(), + FeishuStreamSession::new(card_id, card_message_id, initial_text), + ); + if was_new { + self.order.push_back(om_post.to_string()); + self.evict_if_overcap(); + } + } + + /// Returns the session for `om_post`, if any. + /// + /// Reserved for phase-two card splitting; production callers use `get_mut`. + #[allow(dead_code)] + pub fn get(&self, om_post: &str) -> Option<&FeishuStreamSession> { + self.sessions.get(om_post) + } + + pub fn get_mut(&mut self, om_post: &str) -> Option<&mut FeishuStreamSession> { + self.sessions.get_mut(om_post) + } + + /// Returns `true` if a session for `om_post` exists. + /// + /// Reserved for phase-two card splitting. + #[allow(dead_code)] + pub fn contains(&self, om_post: &str) -> bool { + self.sessions.contains_key(om_post) + } + + /// Remove a session (after finalize / cleanup). `order` keeps the key; the + /// FIFO evictor tolerates and skips entries already gone from `sessions`. + pub fn remove(&mut self, om_post: &str) -> Option { + self.sessions.remove(om_post) + } + + /// Number of active sessions. + /// + /// Reserved for phase-two card splitting. + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.sessions.len() + } + + /// Returns `true` if no sessions are active. + /// + /// Reserved for phase-two card splitting. + #[allow(dead_code)] + pub fn is_empty(&self) -> bool { + self.sessions.is_empty() + } + + /// Keys of sessions that are finalize candidates for the idle reaper: they + /// have actually streamed at least once (`sequence > 0`), are idle ≥ + /// `idle_ms`, and are not yet finalized. + /// + /// The `sequence > 0` guard is essential. A freshly created session (first + /// reply sent as card, or a just-promoted card) sits at sequence 0 while + /// core is still thinking before the first content edit. Finalizing it then + /// would freeze the placeholder before any content arrives — exactly the + /// bug that the post path avoids by not creating a session until the first + /// edit. So the reaper waits until at least one real stream update lands. + pub fn idle_keys(&self, idle_ms: u64) -> Vec { + self.sessions + .iter() + .filter(|(_, s)| s.sequence > 0 && s.is_idle(idle_ms)) + .map(|(k, _)| k.clone()) + .collect() + } + + /// FIFO eviction: when over `STREAM_SESSIONS_MAX`, drop the oldest half by + /// insertion order. Tolerant of order/sessions drift — keys already gone + /// from `sessions` are skipped without counting toward the eviction quota. + fn evict_if_overcap(&mut self) { + if self.sessions.len() > STREAM_SESSIONS_MAX { + let target = self.sessions.len() / 2; + let mut evicted = 0; + while evicted < target { + match self.order.pop_front() { + Some(oldest) => { + if self.sessions.remove(&oldest).is_some() { + evicted += 1; + } + } + None => break, + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use wiremock::matchers::{body_json, body_partial_json, method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + // --- markdown_to_card_v2 (pure) --- + + #[test] + fn card_v2_streaming_has_schema_and_element() { + let card = markdown_to_card_v2("hello", true); + assert_eq!(card["schema"], "2.0"); + assert_eq!(card["config"]["streaming_mode"], true); + assert_eq!(card["config"]["streaming_config"]["print_strategy"], "fast"); + // update_multi must NOT be emitted as false (300302). + assert!(card["config"].get("update_multi").is_none()); + let el = &card["body"]["elements"][0]; + assert_eq!(el["tag"], "markdown"); + assert_eq!(el["element_id"], STREAM_ELEMENT_ID); + assert_eq!(el["content"], "hello"); + } + + #[test] + fn card_v2_static_disables_streaming() { + // The finalize rebuild uses a static card: streaming_mode=false and no + // streaming_config. This is what re-renders GFM tables correctly. + let card = markdown_to_card_v2("| a | b |\n| - | - |\n| 1 | 2 |", false); + assert_eq!(card["config"]["streaming_mode"], false); + assert!(card["config"].get("streaming_config").is_none()); + assert_eq!(card["body"]["elements"][0]["element_id"], STREAM_ELEMENT_ID); + } + + // --- strip_redundant_table_fence (E2E table fix) --- + + const TBL: &str = "| 特性 | Box | Rc |\n| --- | --- | --- |\n| 所有權 | 獨佔 | 共享 |"; + + #[test] + fn strip_unwraps_bare_fenced_table() { + let wrapped = format!("## 標題\n\n```\n{TBL}\n```\n\n結尾"); + let got = strip_redundant_table_fence(&wrapped); + assert_eq!(got.as_ref(), format!("## 標題\n\n{TBL}\n\n結尾")); + } + + #[test] + fn strip_unwraps_multiple_independent_fences() { + let input = format!("```\n{TBL}\n```\n中間\n```\n{TBL}\n```"); + let got = strip_redundant_table_fence(&input); + assert_eq!(got.as_ref(), format!("{TBL}\n中間\n{TBL}")); + } + + #[test] + fn strip_tolerates_edge_blank_lines_in_fence() { + let input = format!("```\n\n{TBL}\n\n```"); + assert_eq!(strip_redundant_table_fence(&input).as_ref(), TBL); + } + + #[test] + fn strip_handles_aligned_delimiters() { + let tbl = "| a | b | c |\n| :-- | :-: | --: |\n| 1 | 2 | 3 |"; + let input = format!("```\n{tbl}\n```"); + assert_eq!(strip_redundant_table_fence(&input).as_ref(), tbl); + } + + #[test] + fn strip_is_idempotent() { + let wrapped = format!("```\n{TBL}\n```"); + let once = strip_redundant_table_fence(&wrapped).into_owned(); + let twice = strip_redundant_table_fence(&once); + assert_eq!(twice.as_ref(), once); + } + + // negatives: must NOT unwrap (Cow::Borrowed, content unchanged) + + #[test] + fn strip_keeps_language_tagged_fence() { + let input = "```rust\nfn main() {}\n```"; + assert!(matches!( + strip_redundant_table_fence(input), + std::borrow::Cow::Borrowed(_) + )); + } + + #[test] + fn strip_keeps_fence_with_table_plus_prose() { + let input = format!("```\n{TBL}\n這是正文\n```"); + assert!(matches!( + strip_redundant_table_fence(&input), + std::borrow::Cow::Borrowed(_) + )); + } + + #[test] + fn strip_keeps_non_table_fence() { + let input = "```\njust some text\nmore text\n```"; + assert!(matches!( + strip_redundant_table_fence(input), + std::borrow::Cow::Borrowed(_) + )); + } + + #[test] + fn strip_keeps_unclosed_fence_midstream() { + let input = format!("```\n{TBL}"); + assert!(matches!( + strip_redundant_table_fence(&input), + std::borrow::Cow::Borrowed(_) + )); + } + + #[test] + fn strip_keeps_table_with_interior_blank_line() { + let input = "```\n| a | b |\n| - | - |\n\n| 1 | 2 |\n```"; + assert!(matches!( + strip_redundant_table_fence(input), + std::borrow::Cow::Borrowed(_) + )); + } + + #[test] + fn strip_keeps_header_delimiter_only_no_data() { + let input = "```\n| a | b |\n| - | - |\n```"; + assert!(matches!( + strip_redundant_table_fence(input), + std::borrow::Cow::Borrowed(_) + )); + } + + #[test] + fn strip_keeps_mismatched_delimiter_cols() { + let input = "```\n| a | b | c |\n| - | - |\n| 1 | 2 | 3 |\n```"; + assert!(matches!( + strip_redundant_table_fence(input), + std::borrow::Cow::Borrowed(_) + )); + } + + #[test] + fn strip_no_fence_is_borrowed() { + let input = "just a plain reply with no fence"; + assert!(matches!( + strip_redundant_table_fence(input), + std::borrow::Cow::Borrowed(_) + )); + } + + #[test] + fn strip_keeps_nested_language_fence_in_body() { + // body contains a ```rust line → non-table line → keep verbatim. + let input = "```\n| a | b |\n| - | - |\n```rust\nfn x() {}\n```"; + assert!(matches!( + strip_redundant_table_fence(input), + std::borrow::Cow::Borrowed(_) + )); + } + + // streaming monotonicity + invariants + + #[test] + fn strip_streaming_unclosed_then_closed() { + // While unclosed → no-op (stable); once closed → unwrap. + let open = "```\n| a | b |\n| - | - |\n| 1 | 2 |"; + assert!(matches!( + strip_redundant_table_fence(open), + std::borrow::Cow::Borrowed(_) + )); + let closed = format!("{open}\n```"); + assert_eq!( + strip_redundant_table_fence(&closed).as_ref(), + "| a | b |\n| - | - |\n| 1 | 2 |" + ); + } + + #[test] + fn strip_edge_cases_do_not_panic() { + for s in [ + "", + "```", + "```\n", + "\n\n", + "```\n```", + "| a |\r\n| - |\r\n| 1 |", + ] { + let _ = strip_redundant_table_fence(s); + } + } + + #[test] + fn strip_three_exit_consistency() { + // create (markdown_to_card_v2 true) and finalize (false) unwrap + // identically; update_card_stream shares the same helper. + let wrapped = format!("```\n{TBL}\n```"); + let create = markdown_to_card_v2(&wrapped, true); + let finalize = markdown_to_card_v2(&wrapped, false); + assert_eq!(create["body"]["elements"][0]["content"], TBL); + assert_eq!(finalize["body"]["elements"][0]["content"], TBL); + } + + // --- create_streaming_card --- + + #[tokio::test] + async fn create_card_success_returns_card_id() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/open-apis/cardkit/v1/cards")) + // Only assert the envelope; the escaped `data` string is covered by + // the pure card_v2 test above. + .and(body_partial_json(serde_json::json!({ "type": "card_json" }))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, + "msg": "success", + "data": { "card_id": "7355372766134157313" } + }))) + .expect(1) + .mount(&server) + .await; + let client = reqwest::Client::new(); + let id = create_streaming_card(&client, &server.uri(), "tok", "initial") + .await + .expect("should return card_id"); + assert_eq!(id, "7355372766134157313"); + } + + #[tokio::test] + async fn create_card_errcode_is_failed() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/open-apis/cardkit/v1/cards")) + .respond_with(ResponseTemplate::new(400).set_body_json(serde_json::json!({ + "code": 300305, + "msg": "The number of card components exceeds 200" + }))) + .expect(1) + .mount(&server) + .await; + let client = reqwest::Client::new(); + let out = create_streaming_card(&client, &server.uri(), "tok", "x") + .await + .expect_err("should fail"); + assert_eq!( + out, + CardOutcome::Failed { + code: 300305, + message: "The number of card components exceeds 200".into() + } + ); + } + + // --- update_card_stream (#565 guard: FULL content, not delta) --- + + #[tokio::test] + async fn update_card_stream_sends_full_content_and_sequence() { + let server = MockServer::start().await; + let card_id = "7355439197428236291"; + let full = "Hello, world!\nThis is the full cumulative snapshot."; + // body_json is an EXACT match: if the client ever sent a delta, an + // extra field, or dropped the sequence, this would fail. This is the + // openclaw-lark #565 guard at the wire level. + Mock::given(method("PUT")) + .and(path(format!( + "/open-apis/cardkit/v1/cards/{card_id}/elements/md_stream/content" + ))) + .and(body_json(serde_json::json!({ + "content": full, + "sequence": 3 + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, "msg": "success", "data": {} + }))) + .expect(1) + .mount(&server) + .await; + let client = reqwest::Client::new(); + let out = update_card_stream( + &client, + &server.uri(), + "tok", + card_id, + STREAM_ELEMENT_ID, + full, + 3, + ) + .await; + assert_eq!(out, CardOutcome::Updated); + } + + #[tokio::test] + async fn update_card_stream_rate_limited_429() { + let server = MockServer::start().await; + Mock::given(method("PUT")) + .respond_with(ResponseTemplate::new(429)) + .expect(1) + .mount(&server) + .await; + let client = reqwest::Client::new(); + let out = update_card_stream( + &client, + &server.uri(), + "tok", + "cid", + STREAM_ELEMENT_ID, + "text", + 1, + ) + .await; + assert_eq!(out, CardOutcome::RateLimited); + } + + #[tokio::test] + async fn update_card_stream_sequence_errcode_is_failed() { + let server = MockServer::start().await; + Mock::given(method("PUT")) + .respond_with(ResponseTemplate::new(400).set_body_json(serde_json::json!({ + "code": 300317, + "msg": "The sequence number for operating on the card did not increment consecutively" + }))) + .expect(1) + .mount(&server) + .await; + let client = reqwest::Client::new(); + let out = update_card_stream( + &client, + &server.uri(), + "tok", + "cid", + STREAM_ELEMENT_ID, + "text", + 1, + ) + .await; + assert!(matches!(out, CardOutcome::Failed { code: 300317, .. })); + } + + // --- finish_card_stream (full card/update → static card, re-renders tables) --- + + #[tokio::test] + async fn finish_card_stream_rebuilds_static_card() { + let server = MockServer::start().await; + let card_id = "7355439197428236291"; + // Finalize = full card/update (PUT /cards/:id) replacing with a STATIC + // card. Build the expected escaped data the same way the client does. + let expected_data = markdown_to_card_v2("final text", false).to_string(); + Mock::given(method("PUT")) + .and(path(format!("/open-apis/cardkit/v1/cards/{card_id}"))) + .and(body_json(serde_json::json!({ + "card": { "type": "card_json", "data": expected_data }, + "sequence": 9 + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, "msg": "success", "data": {} + }))) + .expect(1) + .mount(&server) + .await; + let client = reqwest::Client::new(); + let out = + finish_card_stream(&client, &server.uri(), "tok", card_id, "final text", 9).await; + assert_eq!(out, CardOutcome::Updated); + } + + // --- send_card_message (interactive, reply + create) --- + + #[tokio::test] + async fn send_card_message_reply_in_thread() { + let server = MockServer::start().await; + let root = "om_root123"; + // Build the expected content the same way the client does, so the + // exact-string match is robust to serde_json key ordering. + let expected_content = serde_json::json!({ + "type": "card", + "data": { "card_id": "cardabc" } + }) + .to_string(); + Mock::given(method("POST")) + .and(path(format!("/open-apis/im/v1/messages/{root}/reply"))) + .and(body_json(serde_json::json!({ + "msg_type": "interactive", + "content": expected_content + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, "msg": "success", + "data": { "message_id": "om_sent789" } + }))) + .expect(1) + .mount(&server) + .await; + let client = reqwest::Client::new(); + let mid = + send_card_message(&client, &server.uri(), "tok", "oc_chat", Some(root), "cardabc").await; + assert_eq!(mid.as_deref(), Some("om_sent789")); + } + + #[tokio::test] + async fn send_card_message_new_message_to_chat() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/open-apis/im/v1/messages")) + .and(body_partial_json(serde_json::json!({ + "receive_id": "oc_chat", + "msg_type": "interactive" + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "code": 0, "msg": "success", + "data": { "message_id": "om_new456" } + }))) + .expect(1) + .mount(&server) + .await; + let client = reqwest::Client::new(); + let mid = send_card_message(&client, &server.uri(), "tok", "oc_chat", None, "cardabc").await; + assert_eq!(mid.as_deref(), Some("om_new456")); + } + + // --- streaming session registry (S4) --- + + #[test] + fn session_next_sequence_strictly_increases() { + let mut s = FeishuStreamSession::new("card1".into(), "om_msg1".into(), "txt".into()); + assert_eq!(s.sequence, 0); + assert_eq!(s.next_sequence(), 1); + assert_eq!(s.next_sequence(), 2); + assert_eq!(s.next_sequence(), 3); + assert_eq!(s.sequence, 3); + } + + #[test] + fn session_finalize_blocks_idle_candidacy() { + let mut s = FeishuStreamSession::new("c".into(), "m".into(), "t".into()); + // idle_ms=0 → any non-finalized session is an idle candidate. + assert!(s.is_idle(0)); + s.mark_finalized(); + assert!(!s.is_idle(0)); + } + + #[test] + fn session_not_idle_before_window() { + let s = FeishuStreamSession::new("c".into(), "m".into(), "t".into()); + // Just created → not idle under a 10s window. + assert!(!s.is_idle(10_000)); + } + + #[test] + fn registry_promote_get_remove() { + let mut reg = FeishuStreamRegistry::default(); + assert!(!reg.contains("om_post1")); + reg.promote("om_post1", "card_a".into(), "om_card_msg".into(), "seed".into()); + assert!(reg.contains("om_post1")); + let s = reg.get("om_post1").expect("session present"); + assert_eq!(s.card_id, "card_a"); + assert_eq!(s.card_message_id, "om_card_msg"); + assert_eq!(s.sequence, 0); + assert_eq!(s.last_text, "seed"); + // Mutate the sequence through the registry. + let seq = reg.get_mut("om_post1").unwrap().next_sequence(); + assert_eq!(seq, 1); + // Remove. + let removed = reg.remove("om_post1").expect("removed"); + assert_eq!(removed.card_id, "card_a"); + assert!(!reg.contains("om_post1")); + } + + #[test] + fn registry_promote_is_one_way_no_duplicate_order() { + let mut reg = FeishuStreamRegistry::default(); + reg.promote("om_post1", "card_a".into(), "m1".into(), "t1".into()); + reg.get_mut("om_post1").unwrap().next_sequence(); // seq = 1 + // Re-promoting the same key replaces the session and keeps a single + // order entry (a fresh session, sequence reset). + reg.promote("om_post1", "card_b".into(), "m2".into(), "t2".into()); + assert_eq!(reg.len(), 1); + let s = reg.get("om_post1").unwrap(); + assert_eq!(s.card_id, "card_b"); + assert_eq!(s.sequence, 0); + } + + #[test] + fn registry_idle_keys_excludes_finalized() { + let mut reg = FeishuStreamRegistry::default(); + reg.promote("om_a", "card_a".into(), "m_a".into(), "ta".into()); + reg.promote("om_b", "card_b".into(), "m_b".into(), "tb".into()); + // Both have streamed at least once (sequence > 0). + reg.get_mut("om_a").unwrap().next_sequence(); + reg.get_mut("om_b").unwrap().next_sequence(); + reg.get_mut("om_b").unwrap().mark_finalized(); + let mut idle = reg.idle_keys(0); + idle.sort(); + assert_eq!(idle, vec!["om_a".to_string()]); + } + + #[test] + fn registry_idle_keys_excludes_unstreamed_session() { + // A freshly created session at sequence 0 (core still thinking before + // the first content edit) must NOT be a finalize candidate — otherwise + // the reaper freezes the placeholder before any content streams in. + let mut reg = FeishuStreamRegistry::default(); + reg.promote("om_fresh", "c".into(), "m".into(), "...".into()); + assert!(reg.idle_keys(0).is_empty()); + // After the first stream update it becomes a candidate. + reg.get_mut("om_fresh").unwrap().next_sequence(); + assert_eq!(reg.idle_keys(0), vec!["om_fresh".to_string()]); + } + + #[test] + fn registry_fifo_eviction_keeps_recent() { + let mut reg = FeishuStreamRegistry::default(); + // Insert past the cap; oldest insertions evicted, newest kept. + let total = STREAM_SESSIONS_MAX + 5; + for i in 0..total { + reg.promote( + &format!("om_{i}"), + format!("card_{i}"), + format!("m_{i}"), + format!("t_{i}"), + ); + } + assert!( + reg.len() <= STREAM_SESSIONS_MAX, + "len {} should be <= cap {STREAM_SESSIONS_MAX}", + reg.len() + ); + // The most recently inserted session must survive. + assert!(reg.contains(&format!("om_{}", total - 1))); + // The very first inserted session should have been evicted. + assert!(!reg.contains("om_0")); + } +} diff --git a/gateway/src/adapters/mod.rs b/gateway/src/adapters/mod.rs index 94a2a8a79..a1ab7fa6d 100644 --- a/gateway/src/adapters/mod.rs +++ b/gateway/src/adapters/mod.rs @@ -1,4 +1,5 @@ pub mod feishu; +pub mod feishu_card; pub mod googlechat; pub mod line; pub mod teams; diff --git a/gateway/src/main.rs b/gateway/src/main.rs index b5cad2f6a..c5dfec24f 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -299,6 +299,20 @@ async fn main() -> Result<()> { // Resolve feishu bot identity early (needed for mention gating in both modes) if let Some(ref f) = feishu { f.resolve_bot_identity().await; + // Card-streaming idle-finalize reaper: only when streaming is enabled. + // With mode=post (default) no sessions are ever created, so spawning it + // would just idle — skip entirely to keep the default path untouched. + if f.config.streaming_mode != adapters::feishu::StreamingMode::Post { + let sessions = f.stream_sessions.clone(); + let token_cache = f.token_cache.clone(); + let client = f.client.clone(); + let api_base = f.config.api_base(); + let idle_ms = f.config.card_idle_finalize_ms; + tokio::spawn(adapters::feishu::run_idle_reaper( + sessions, token_cache, client, api_base, idle_ms, + )); + info!(idle_ms, "feishu card-streaming idle reaper started"); + } } // Google Chat adapter