Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/capabilities/slack.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ The daemon listens for `app_mention` events on Sam's bot and for non-mention rep

Sam does not respond to every message in a channel — only direct @-mentions and replies in threads Sam is already part of. People can talk in channels Sam is in without Sam jumping in.

### Multi-channel and Scheduled Top-level Routing

Sam can operate across multiple Slack channels when invited to them. Direct mentions and replies in threads Sam has already posted in are always processed across any member channel. Even when `SAM_CHANNEL` is set to restrict general incoming routing during testing or development, explicit `@sam` direct mentions remain fully active in all channels where Sam is a member.

The routing rules are:

- **Centralized broadcast channel:** Scheduled top-level posts (like the `daily-maintenance` status broadcast) are kept centralized and are always sent to the designated broadcast channel (the channel named `sam`, or configured in the environment).
- **Reactive replies in other channels:** In other channels, Sam never initiates top-level posts or scheduled broadcasts. Sam only responds reactively to direct `@sam` mentions or follow-up replies in threads where Sam participated.

## Posting to Slack

Sam posts via the Slack Web API using `SLACK_BOT_TOKEN`. The daemon doesn't post for Sam; Sam posts for itself, by calling the API as a tool. Same for setting status, opening streams, attaching feedback blocks.
Expand Down
83 changes: 59 additions & 24 deletions src/runtime/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ def __init__(self) -> None:
self.shutdown_event = asyncio.Event()
self.bot_user_id: Optional[str] = None
self.sam_channel_name: Optional[str] = None
# Channel name <-> id mappings resolved at startup via users.conversations
self._channel_id_by_name: dict[str, str] = {}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any tests possible for this mapping?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in c0f9c8a — 13 tests in tests/runtime/test_channel_routing.py covering _resolve_member_channels (populates both dicts, skips nameless entries, paginates, degrades on partial API failure), broadcast_channel_id (SAM_CHANNEL precedence, sam-by-name fallback, SAM_BROADCAST_CHANNEL override, None when target missing), and _channel_allowed (narrows by default, bypasses with force_allow_member, rejects non-members). 183 passing locally.

self._channel_name_by_id: dict[str, str] = {}
# Channels Sam is a member of (resolved at startup via users.conversations).
# When SAM_CHANNEL is unset, this is the scope; when SAM_CHANNEL is set, it
# narrows scope further to just that one channel for dev/testing.
Expand Down Expand Up @@ -291,17 +294,38 @@ def _mark_seen(self, ts: str) -> bool:
self._seen_ts.discard(old)
return True

def _channel_allowed(self, channel: Optional[str]) -> bool:
@property
def broadcast_channel_id(self) -> Optional[str]:
"""Return the target channel ID for scheduled posts and top-level broadcasts.

If SAM_CHANNEL is explicitly set (dev/testing scope), that's the target.
Otherwise, look up the channel named "sam" in the member-channel map
resolved at startup. Returns None if Sam isn't a member of a channel
named "sam" — caller (cron path) treats None as "disable scheduled
broadcasts" rather than crashing.

Single source: `SAM_CHANNEL` env, then literal `"sam"` by name. No
third-layer env override — keeps configuration to one place per
`feedback_one_config_file`.
"""
if SAM_CHANNEL:
return SAM_CHANNEL
return self._channel_id_by_name.get("sam")

def _channel_allowed(self, channel: Optional[str], force_allow_member: bool = False) -> bool:
"""Channel admission check.

Sam responds in any channel it's a member of (set via Slack invites
and discovered at startup via `users.conversations`). If `SAM_CHANNEL`
is also set, scope narrows to that single channel — useful for
dev/testing against one room.

If force_allow_member is True, we allow any member channel even if
SAM_CHANNEL restricts general routing.
"""
if not channel:
return False
if SAM_CHANNEL:
if SAM_CHANNEL and not force_allow_member:
return channel == SAM_CHANNEL
return channel in self.member_channels

Expand Down Expand Up @@ -332,8 +356,12 @@ async def _resolve_member_channels(self) -> set[str]:
break
for ch in resp.get("channels", []):
ch_id = ch.get("id")
ch_name = ch.get("name")
if ch_id:
channels.add(ch_id)
if ch_name:
self._channel_id_by_name[ch_name] = ch_id
self._channel_name_by_id[ch_id] = ch_name
cursor = (resp.get("response_metadata") or {}).get("next_cursor")
if not cursor:
break
Expand Down Expand Up @@ -364,7 +392,8 @@ def _is_side_pane_event(self, event: dict) -> bool:
via `assistant_thread_started`.
"""
channel = event.get("channel")
if SAM_CHANNEL and channel == SAM_CHANNEL:
target_channel = self.broadcast_channel_id
if target_channel and channel == target_channel:
return False
if event.get("assistant_thread"):
return True
Expand Down Expand Up @@ -861,7 +890,7 @@ async def _send_side_pane_redirect(self, channel: str) -> None:
def _register_handlers(self) -> None:
@self.app.event("app_mention")
async def on_app_mention(event, client):
await self._handle_event(event)
await self._handle_event(event, force_allow_member=True)

@self.app.event("message")
async def on_message(event, client):
Expand All @@ -881,7 +910,7 @@ async def on_message(event, client):
if self._is_side_pane_event(event):
await self._send_side_pane_redirect(channel)
return
if not self._channel_allowed(channel):
if not self._channel_allowed(channel, force_allow_member=True):
return
# In a channel: only act on thread replies in threads where the
# bot has already posted. Top-level non-mention chatter is
Expand All @@ -891,7 +920,7 @@ async def on_message(event, client):
return
if not await self._bot_participates_in_thread(channel, thread_ts):
return
await self._handle_event(event)
await self._handle_event(event, force_allow_member=True)

@self.app.event("assistant_thread_started")
async def on_assistant_thread_started(event, client):
Expand All @@ -910,7 +939,7 @@ async def on_reaction_added(event, client):
async def on_reaction_removed(event, client):
log.info("reaction removed: %s on %s", event.get("reaction"), event.get("item"))

async def _handle_event(self, event: dict, *, recovered: bool = False) -> None:
async def _handle_event(self, event: dict, *, recovered: bool = False, force_allow_member: bool = False) -> None:
"""Route a Slack event into the session queue.

`recovered=True` is set by boot-time `_recover_from_reactions` so
Expand All @@ -926,7 +955,7 @@ async def _handle_event(self, event: dict, *, recovered: bool = False) -> None:
return

channel = event.get("channel")
if not self._channel_allowed(channel):
if not self._channel_allowed(channel, force_allow_member=force_allow_member):
log.info("ignoring event from non-whitelisted channel %s", channel)
return

Expand Down Expand Up @@ -1152,7 +1181,7 @@ async def _recover_from_reactions(self) -> int:
events = _select_non_terminal_from_reactions(
resp.get("items", []),
self.bot_user_id,
self._channel_allowed,
lambda ch: self._channel_allowed(ch, force_allow_member=True),
)
for event in events:
ts = event.get("ts") or ""
Expand Down Expand Up @@ -1569,8 +1598,9 @@ async def _run_cron_skill(self, skill_name: str, cron_expr: str) -> None:
skill_name,
)
return
if not SAM_CHANNEL:
log.info("scheduled skill %s: disabled (SAM_CHANNEL unset)", skill_name)
target_channel = self.broadcast_channel_id
if not target_channel:
log.info("scheduled skill %s: disabled (no broadcast channel found)", skill_name)
return
try:
iterator = croniter(cron_expr, datetime.now().astimezone())
Expand All @@ -1594,23 +1624,27 @@ async def _run_cron_skill(self, skill_name: str, cron_expr: str) -> None:
await self._enqueue_scheduled_skill(skill_name)

async def _enqueue_scheduled_skill(self, skill_name: str) -> None:
target_channel = self.broadcast_channel_id
if not target_channel:
log.error("cannot enqueue scheduled skill %s: no broadcast channel found", skill_name)
return
ts = f"{time.time():.6f}"
today_journal = f"/data/journal/{datetime.now().date().isoformat()}.md"
text = SCHEDULED_SKILL_TEMPLATE.format(
skill_name=skill_name,
today_journal=today_journal,
channel=SAM_CHANNEL,
channel=target_channel,
)
message = IncomingMessage(
channel=SAM_CHANNEL,
channel=target_channel,
user=self.bot_user_id or "scheduler",
text=text,
thread_ts=None,
event_ts=ts,
scheduled=True,
raw_event={},
)
log.info("scheduled skill %s: queuing (event_ts=%s)", skill_name, ts)
log.info("scheduled skill %s: queuing (event_ts=%s, channel=%s)", skill_name, ts, target_channel)
await self.queue.put(message)

async def run(self) -> None:
Expand All @@ -1626,16 +1660,6 @@ async def run(self) -> None:
except Exception:
log.exception("auth.test failed; will not recognize own messages reliably")

# Resolve the SAM_CHANNEL id to a name so we can refer to it by #name
# in the side-pane redirect copy.
if SAM_CHANNEL:
try:
info = await self.app.client.conversations_info(channel=SAM_CHANNEL)
self.sam_channel_name = (info.get("channel") or {}).get("name")
log.info("Sam channel resolved: #%s (%s)", self.sam_channel_name, SAM_CHANNEL)
except Exception:
log.exception("conversations.info failed; side-pane redirect will use a generic phrase")

# Resolve every channel Sam is a member of. Used by catch-up (when
# SAM_CHANNEL is unset) and by `_channel_allowed` for inbound routing.
# Adding Sam to a new channel takes effect on next daemon restart;
Expand All @@ -1645,6 +1669,17 @@ async def run(self) -> None:
self.member_channels = await self._resolve_member_channels()
log.info("Sam is a member of %d channel(s)", len(self.member_channels))

# Resolve the channel name so we can refer to it by #name in the side-pane
# redirect copy. Falls back to the broadcast channel if SAM_CHANNEL is unset.
target_channel = self.broadcast_channel_id
if target_channel:
try:
info = await self.app.client.conversations_info(channel=target_channel)
self.sam_channel_name = (info.get("channel") or {}).get("name")
log.info("Sam channel resolved: #%s (%s)", self.sam_channel_name, target_channel)
except Exception:
log.exception("conversations.info failed; side-pane redirect will use a generic phrase")

# Cloud Run startup probe requires an HTTP server on $PORT before the
# revision is marked healthy. Sam is otherwise outbound-only (Slack
# Socket Mode), so we expose a tiny aiohttp 200-OK endpoint just for
Expand Down
Loading
Loading