diff --git a/README.md b/README.md index 752ff3e..ff0c8aa 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,25 @@ That's it. Tasks are queued to Redis, workers process them in parallel, progress ## Supported Patterns +### Activity Metadata (Multi-Tenancy) + +Attach arbitrary metadata when enqueueing tasks for filtering and tenant isolation: + +```python +task = await ax.enqueue( + "process_document", + context, + metadata={"organization_id": "org-123", "user_id": "user-456"} +) + +# Filter activities by metadata +activities = ax.activity.list(db, metadata_filter={"organization_id": "org-123"}) +detail = ax.activity.detail(db, agent_id, metadata_filter={"organization_id": "org-123"}) + +# Access metadata programmatically (excluded from API serialization by default) +org_id = detail.metadata["organization_id"] +``` + ### Automatic Activity Tracking Every task gets full lifecycle tracking without manual updates: @@ -386,6 +405,7 @@ Activity tracking uses SQLAlchemy with two tables: **`agentexec_activity`** - Main activity records - `agent_id` - Unique identifier (UUID) - `agent_type` - Task name +- `metadata` - JSON field for custom data (e.g., tenant info) - `created_at`, `updated_at` - Timestamps **`agentexec_activity_log`** - Status and progress @@ -733,4 +753,5 @@ MIT License - see [LICENSE](LICENSE) for details. - **npm**: [agentexec-ui](https://www.npmjs.com/package/agentexec-ui) - **Documentation**: [docs/](docs/) - **Example App**: [examples/openai-agents-fastapi/](examples/openai-agents-fastapi/) +- **Multi-Tenancy Example**: [examples/multi-tenancy/](examples/multi-tenancy/) - **Issues**: [GitHub Issues](https://github.com/Agent-CI/agentexec/issues) diff --git a/examples/multi-tenancy/README.md b/examples/multi-tenancy/README.md new file mode 100644 index 0000000..a086c5a --- /dev/null +++ b/examples/multi-tenancy/README.md @@ -0,0 +1,92 @@ +# Multi-Tenancy with Activity Metadata + +This example demonstrates how to use activity metadata for multi-tenant applications. + +## Overview + +When building multi-tenant applications, you often need to: +1. Associate background tasks with specific organizations/tenants +2. Filter activity views to only show tasks belonging to the current tenant +3. Ensure proper data isolation between tenants + +The `metadata` parameter on `ax.enqueue()` and `pipeline.enqueue()` enables this by attaching arbitrary key-value pairs to activity records. + +## Usage + +### Enqueueing with Metadata + +```python +import agentexec as ax + +# Enqueue a task with organization context +task = await ax.enqueue( + "process_document", + DocumentContext(file_id="doc-123"), + metadata={"organization_id": "org-456", "user_id": "user-789"} +) +``` + +### Filtering Activities by Metadata + +```python +from agentexec import activity + +# List only activities for a specific organization +activities = activity.list( + session, + metadata_filter={"organization_id": "org-456"} +) + +# Get activity detail with tenant validation +# Returns None if the activity doesn't belong to this organization +detail = activity.detail( + session, + agent_id="...", + metadata_filter={"organization_id": "org-456"} +) +``` + +### Pipeline Example + +```python +pipeline = ax.Pipeline(pool) + +class DocumentPipeline(pipeline.Base): + @pipeline.step(0) + async def extract(self, ctx: DocumentContext) -> ExtractedData: + ... + +# Enqueue pipeline with metadata +task = await pipeline.enqueue( + context=DocumentContext(file_id="doc-123"), + metadata={"organization_id": "org-456"} +) +``` + +## Running the Example + +```bash +# Install dependencies +pip install agentexec + +# Run the example +python example.py +``` + +## Database Schema + +The metadata is stored as a JSON column on the `agentexec_activity` table: + +```sql +ALTER TABLE agentexec_activity ADD COLUMN metadata JSON; +``` + +For PostgreSQL, this maps to JSONB which supports efficient filtering. + +## Notes + +- Metadata is immutable once set at enqueue time +- Filtering uses exact string matching on metadata values +- **Metadata is excluded from API serialization by default** to prevent accidental leakage of tenant info +- Access metadata programmatically via the `.metadata` attribute (e.g., `activity.metadata`) +- To include metadata in API responses, explicitly add it to your response model diff --git a/examples/multi-tenancy/example.py b/examples/multi-tenancy/example.py new file mode 100644 index 0000000..f4a29fb --- /dev/null +++ b/examples/multi-tenancy/example.py @@ -0,0 +1,188 @@ +"""Multi-tenancy example demonstrating activity metadata filtering. + +This example shows how to: +1. Attach metadata (like organization_id) when enqueueing tasks +2. Filter activities by metadata for tenant isolation +3. Use metadata in both list and detail views +""" + +import asyncio +from uuid import UUID + +from pydantic import BaseModel +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +import agentexec as ax + + +# --- Models --- + + +class DocumentContext(BaseModel): + """Input context for document processing.""" + + file_id: str + filename: str + + +class ProcessedDocument(BaseModel): + """Result of document processing.""" + + file_id: str + word_count: int + summary: str + + +# --- Setup --- + +# Create in-memory SQLite database for demo +engine = create_engine("sqlite:///multi_tenant_demo.db", echo=False) +SessionLocal = sessionmaker(bind=engine) + +# Create tables +ax.Base.metadata.create_all(bind=engine) + +# Create worker pool +pool = ax.Pool(engine=engine) + + +# --- Task Definition --- + + +@pool.task("process_document") +async def process_document( + *, + agent_id: UUID, + context: DocumentContext, +) -> ProcessedDocument: + """Simulate document processing.""" + # Simulate some work + ax.activity.update(agent_id, "Extracting text...", percentage=25) + await asyncio.sleep(0.1) + + ax.activity.update(agent_id, "Analyzing content...", percentage=50) + await asyncio.sleep(0.1) + + ax.activity.update(agent_id, "Generating summary...", percentage=75) + await asyncio.sleep(0.1) + + return ProcessedDocument( + file_id=context.file_id, + word_count=1234, + summary=f"Summary of {context.filename}", + ) + + +# --- Demo Functions --- + + +async def enqueue_tasks_for_tenants(): + """Enqueue tasks for different organizations.""" + print("\n=== Enqueueing Tasks ===\n") + + # Organization A: Enqueue 2 tasks + task1 = await ax.enqueue( + "process_document", + DocumentContext(file_id="doc-001", filename="report.pdf"), + metadata={"organization_id": "org-A", "user_id": "user-1"}, + ) + print(f"Org A - Task 1: {task1.agent_id}") + + task2 = await ax.enqueue( + "process_document", + DocumentContext(file_id="doc-002", filename="invoice.pdf"), + metadata={"organization_id": "org-A", "user_id": "user-2"}, + ) + print(f"Org A - Task 2: {task2.agent_id}") + + # Organization B: Enqueue 1 task + task3 = await ax.enqueue( + "process_document", + DocumentContext(file_id="doc-003", filename="contract.pdf"), + metadata={"organization_id": "org-B", "user_id": "user-3"}, + ) + print(f"Org B - Task 1: {task3.agent_id}") + + return task1, task2, task3 + + +def list_activities_for_tenant(org_id: str): + """List activities filtered by organization.""" + print(f"\n=== Activities for {org_id} ===\n") + + with SessionLocal() as session: + result = ax.activity.list( + session, + metadata_filter={"organization_id": org_id}, + ) + + print(f"Total activities: {result.total}") + for item in result.items: + print(f" - {item.agent_id}: {item.agent_type} ({item.status})") + print(f" Metadata: {item.metadata}") + + +def get_activity_detail_with_tenant_check(agent_id: UUID, org_id: str): + """Get activity detail with tenant validation.""" + print(f"\n=== Detail for {agent_id} (checking {org_id}) ===\n") + + with SessionLocal() as session: + # This returns None if the activity doesn't belong to the org + detail = ax.activity.detail( + session, + agent_id, + metadata_filter={"organization_id": org_id}, + ) + + if detail: + print(f"Found: {detail.agent_type}") + print(f"Metadata: {detail.metadata}") + print(f"Logs: {len(detail.logs)} entries") + else: + print(f"Not found (or doesn't belong to {org_id})") + + +async def main(): + """Run the multi-tenancy demo.""" + print("Multi-Tenancy Demo with Activity Metadata") + print("=" * 50) + + # 1. Enqueue tasks for different organizations + task1, task2, task3 = await enqueue_tasks_for_tenants() + + # 2. List all activities (no filter) + print("\n=== All Activities (no filter) ===\n") + with SessionLocal() as session: + all_activities = ax.activity.list(session) + print(f"Total: {all_activities.total}") + for item in all_activities.items: + print(f" - {item.agent_type}: {item.metadata}") + + # 3. List activities filtered by organization + list_activities_for_tenant("org-A") # Should show 2 tasks + list_activities_for_tenant("org-B") # Should show 1 task + list_activities_for_tenant("org-C") # Should show 0 tasks + + # 4. Detail view with tenant validation + # Try to access Org A's task as Org A (should work) + get_activity_detail_with_tenant_check(task1.agent_id, "org-A") + + # Try to access Org A's task as Org B (should return None) + get_activity_detail_with_tenant_check(task1.agent_id, "org-B") + + # 5. Filter by multiple metadata fields + print("\n=== Filter by org AND user ===\n") + with SessionLocal() as session: + result = ax.activity.list( + session, + metadata_filter={"organization_id": "org-A", "user_id": "user-1"}, + ) + print(f"Found {result.total} activities for org-A + user-1") + + print("\n" + "=" * 50) + print("Demo complete!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/agentexec/activity/models.py b/src/agentexec/activity/models.py index 6ad4874..8d05565 100644 --- a/src/agentexec/activity/models.py +++ b/src/agentexec/activity/models.py @@ -3,11 +3,14 @@ import uuid from datetime import UTC, datetime +from typing import Any + from sqlalchemy import ( DateTime, Enum, ForeignKey, Integer, + JSON, String, Text, Uuid, @@ -58,6 +61,12 @@ def __tablename__(cls) -> str: default=lambda: datetime.now(UTC), onupdate=lambda: datetime.now(UTC), ) + metadata_: Mapped[dict[str, Any] | None] = mapped_column( + "metadata", + JSON, + nullable=True, + default=None, + ) logs: Mapped[list[ActivityLog]] = relationship( "ActivityLog", @@ -113,15 +122,19 @@ def get_by_agent_id( cls, session: Session, agent_id: str | uuid.UUID, + metadata_filter: dict[str, Any] | None = None, ) -> Activity | None: """Get an activity by agent_id. Args: session: SQLAlchemy session agent_id: The agent_id to look up (string or UUID) + metadata_filter: Optional dict of key-value pairs to filter by. + If provided and the activity's metadata doesn't match, + returns None (same as if not found). Returns: - Activity object or None if not found + Activity object or None if not found or metadata doesn't match Example: activity = Activity.get_by_agent_id(session, "abc-123") @@ -129,11 +142,26 @@ def get_by_agent_id( activity = Activity.get_by_agent_id(session, uuid.UUID("abc-123...")) if activity: print(f"Found activity: {activity.agent_type}") + + # With metadata filter (for multi-tenancy) + activity = Activity.get_by_agent_id( + session, + agent_id, + metadata_filter={"organization_id": "org-123"} + ) """ # Normalize to UUID if string if isinstance(agent_id, str): agent_id = uuid.UUID(agent_id) - return session.query(cls).filter_by(agent_id=agent_id).first() + + query = session.query(cls).filter_by(agent_id=agent_id) + + # Apply metadata filtering if provided + if metadata_filter: + for key, value in metadata_filter.items(): + query = query.filter(cls.metadata_[key].as_string() == str(value)) + + return query.first() @classmethod def get_list( @@ -141,6 +169,7 @@ def get_list( session: Session, page: int = 1, page_size: int = 50, + metadata_filter: dict[str, Any] | None = None, ) -> list[RowMapping]: """Get a paginated list of activities with summary information. @@ -148,16 +177,25 @@ def get_list( session: SQLAlchemy session to use for the query page: Page number (1-indexed) page_size: Number of items per page + metadata_filter: Optional dict of key-value pairs to filter by. + Activities must have metadata containing all specified keys + with exactly matching values. Returns: List of RowMapping objects (dict-like) with keys matching ActivitySummarySchema: agent_id, agent_type, latest_log_message, status, latest_log_timestamp, - percentage, started_at + percentage, started_at, metadata Example: results = Activity.get_list(session, page=1, page_size=20) for row in results: print(f"{row['agent_id']}: {row['latest_log_message']}") + + # Filter by organization + results = Activity.get_list( + session, + metadata_filter={"organization_id": "org-123"} + ) """ # Subquery to get the latest log for each agent latest_log_subq = select( @@ -198,6 +236,7 @@ def get_list( latest_log.c.created_at.label("latest_log_timestamp"), latest_log.c.percentage, started_at.c.started_at, + cls.metadata_.label("metadata"), ) .outerjoin( latest_log, @@ -206,6 +245,13 @@ def get_list( .outerjoin(started_at, cls.id == started_at.c.activity_id) ) + # Apply metadata filtering if provided + if metadata_filter: + for key, value in metadata_filter.items(): + # Use JSON path extraction for exact string matching + # This works across SQLite (for testing) and PostgreSQL (for production) + query = query.where(cls.metadata_[key].as_string() == str(value)) + # Custom ordering: active agents (running, queued) at the top is_active = case( (latest_log.c.status.in_([Status.RUNNING, Status.QUEUED]), 0), diff --git a/src/agentexec/activity/schemas.py b/src/agentexec/activity/schemas.py index f0ff930..e326348 100644 --- a/src/agentexec/activity/schemas.py +++ b/src/agentexec/activity/schemas.py @@ -1,5 +1,6 @@ import uuid from datetime import datetime +from typing import Any from pydantic import BaseModel, ConfigDict, Field, computed_field @@ -29,6 +30,7 @@ class ActivityDetailSchema(BaseModel): created_at: datetime updated_at: datetime logs: list[ActivityLogSchema] = Field(default_factory=list) + metadata: dict[str, Any] | None = Field(default=None, alias="metadata_", exclude=True) class ActivityListItemSchema(BaseModel): @@ -47,6 +49,7 @@ class ActivityListItemSchema(BaseModel): latest_log_timestamp: datetime | None = None percentage: int | None = 0 started_at: datetime | None = None + metadata: dict[str, Any] | None = Field(default=None, exclude=True) @computed_field def elapsed_time_seconds(self) -> int: diff --git a/src/agentexec/activity/tracker.py b/src/agentexec/activity/tracker.py index 9bcc7e8..12aeff8 100644 --- a/src/agentexec/activity/tracker.py +++ b/src/agentexec/activity/tracker.py @@ -1,4 +1,5 @@ import uuid +from typing import Any from sqlalchemy.orm import Session @@ -45,13 +46,17 @@ def create( message: str = "Agent queued", agent_id: str | uuid.UUID | None = None, session: Session | None = None, + metadata: dict[str, Any] | None = None, ) -> uuid.UUID: """Create a new agent activity record with initial queued status. Args: task_name: Name/type of the task (e.g., "research", "analysis") - initial_message: Initial log message (default: "Agent queued") + message: Initial log message (default: "Agent queued") agent_id: Optional custom agent ID (string or UUID). If not provided, one will be auto-generated. + session: Optional SQLAlchemy session. If not provided, uses global session factory. + metadata: Optional dict of arbitrary metadata to attach to the activity. + Useful for multi-tenancy (e.g., {"organization_id": "org-123"}). Returns: The agent_id (as UUID object) of the created record @@ -62,6 +67,7 @@ def create( activity_record = Activity( agent_id=agent_id, agent_type=task_name, + metadata_=metadata, ) db.add(activity_record) db.flush() @@ -208,6 +214,7 @@ def list( session: Session, page: int = 1, page_size: int = 50, + metadata_filter: dict[str, Any] | None = None, ) -> ActivityListSchema: """List activities with pagination. @@ -215,12 +222,26 @@ def list( session: SQLAlchemy session to use for the query page: Page number (1-indexed) page_size: Number of items per page + metadata_filter: Optional dict of key-value pairs to filter by. + Activities must have metadata containing all specified keys + with exactly matching values. Returns: ActivityList with list of ActivityListItemSchema items """ - total = session.query(Activity).count() - rows = Activity.get_list(session, page=page, page_size=page_size) + # Build base query for total count + query = session.query(Activity) + if metadata_filter: + for key, value in metadata_filter.items(): + query = query.filter(Activity.metadata_[key].as_string() == str(value)) + total = query.count() + + rows = Activity.get_list( + session, + page=page, + page_size=page_size, + metadata_filter=metadata_filter, + ) return ActivityListSchema( items=[ActivityListItemSchema.model_validate(row) for row in rows], @@ -233,17 +254,22 @@ def list( def detail( session: Session, agent_id: str | uuid.UUID, + metadata_filter: dict[str, Any] | None = None, ) -> ActivityDetailSchema | None: """Get a single activity by agent_id with all logs. Args: session: SQLAlchemy session to use for the query agent_id: The agent_id to look up + metadata_filter: Optional dict of key-value pairs to filter by. + If provided and the activity's metadata doesn't match, + returns None (same as if not found). Returns: ActivityDetailSchema with full log history, or None if not found + or if metadata doesn't match """ - if item := Activity.get_by_agent_id(session, agent_id): + if item := Activity.get_by_agent_id(session, agent_id, metadata_filter=metadata_filter): return ActivityDetailSchema.model_validate(item) return None diff --git a/src/agentexec/core/queue.py b/src/agentexec/core/queue.py index bf0328b..56b5316 100644 --- a/src/agentexec/core/queue.py +++ b/src/agentexec/core/queue.py @@ -29,6 +29,7 @@ async def enqueue( *, priority: Priority = Priority.LOW, queue_name: str | None = None, + metadata: dict[str, Any] | None = None, ) -> Task: """Enqueue a task for background execution. @@ -40,6 +41,8 @@ async def enqueue( context: Task context as a Pydantic BaseModel. priority: Task priority (Priority.HIGH or Priority.LOW). queue_name: Queue name. Defaults to CONF.queue_name. + metadata: Optional dict of arbitrary metadata to attach to the activity. + Useful for multi-tenancy (e.g., {"organization_id": "org-123"}). Returns: Task instance with typed context and agent_id for tracking. @@ -50,6 +53,13 @@ async def research(agent_id: UUID, context: ResearchContext): ... task = await ax.enqueue("research_company", ResearchContext(company="Acme")) + + # With metadata for multi-tenancy + task = await ax.enqueue( + "research_company", + ResearchContext(company="Acme"), + metadata={"organization_id": "org-123"} + ) """ push_func = { Priority.HIGH: state.backend.rpush, @@ -59,6 +69,7 @@ async def research(agent_id: UUID, context: ResearchContext): task = Task.create( task_name=task_name, context=context, + metadata=metadata, ) push_func( queue_name or CONF.queue_name, diff --git a/src/agentexec/core/task.py b/src/agentexec/core/task.py index cc1221d..0c6471f 100644 --- a/src/agentexec/core/task.py +++ b/src/agentexec/core/task.py @@ -215,7 +215,12 @@ def from_serialized(cls, definition: TaskDefinition, data: dict[str, Any]) -> Ta return task @classmethod - def create(cls, task_name: str, context: BaseModel) -> Task: + def create( + cls, + task_name: str, + context: BaseModel, + metadata: dict[str, Any] | None = None, + ) -> Task: """Create a new task with automatic activity tracking. This is a convenience method that creates both a Task instance and @@ -224,6 +229,8 @@ def create(cls, task_name: str, context: BaseModel) -> Task: Args: task_name: Name/type of the task (e.g., "research", "analysis") context: Task context as a Pydantic model + metadata: Optional dict of arbitrary metadata to attach to the activity. + Useful for multi-tenancy (e.g., {"organization_id": "org-123"}). Returns: Task instance with agent_id set @@ -232,10 +239,18 @@ def create(cls, task_name: str, context: BaseModel) -> Task: ctx = ResearchContext(company="Acme") task = Task.create("research_company", ctx) task.context.company # Typed access + + # With metadata for multi-tenancy + task = Task.create( + "research_company", + ctx, + metadata={"organization_id": "org-123"} + ) """ agent_id = activity.create( task_name=task_name, message=CONF.activity_message_create, + metadata=metadata, ) return cls( diff --git a/src/agentexec/pipeline.py b/src/agentexec/pipeline.py index 1e063c6..9801e79 100644 --- a/src/agentexec/pipeline.py +++ b/src/agentexec/pipeline.py @@ -11,9 +11,9 @@ TypeAlias, Union, cast, - get_type_hints, - get_origin, get_args, + get_origin, + get_type_hints, ) from uuid import UUID @@ -320,17 +320,23 @@ def decorator(func: StepHandler) -> StepHandler: return decorator - async def enqueue(self, context: BaseModel) -> Task: + async def enqueue( + self, + context: BaseModel, + metadata: dict[str, Any] | None = None, + ) -> Task: """Enqueue the pipeline to run on a worker. Args: context: Initial context passed to the first step + metadata: Optional dict of arbitrary metadata to attach to the activity. + Useful for multi-tenancy (e.g., {"organization_id": "org-123"}). Returns: Task instance for tracking the pipeline execution """ self._validate_type_flow() - return await queue.enqueue(self.name, context) + return await queue.enqueue(self.name, context, metadata=metadata) async def run(self, context: BaseModel) -> StepResult: """Execute the pipeline inline (blocking). diff --git a/tests/test_activity_tracking.py b/tests/test_activity_tracking.py index 2893552..5b5d0fa 100644 --- a/tests/test_activity_tracking.py +++ b/tests/test_activity_tracking.py @@ -353,3 +353,213 @@ def test_create_activity_with_string_agent_id(db_session: Session): ) assert agent_id == custom_id + + +# --- Metadata Tests --- + + +def test_create_activity_with_metadata(db_session: Session): + """Test creating activity with metadata.""" + agent_id = activity.create( + task_name="metadata_task", + message="Test with metadata", + session=db_session, + metadata={"organization_id": "org-123", "user_id": "user-456"}, + ) + + activity_record = Activity.get_by_agent_id(db_session, agent_id) + assert activity_record is not None + assert activity_record.metadata_ == {"organization_id": "org-123", "user_id": "user-456"} + + +def test_create_activity_without_metadata(db_session: Session): + """Test that metadata is None by default.""" + agent_id = activity.create( + task_name="no_metadata_task", + message="Test without metadata", + session=db_session, + ) + + activity_record = Activity.get_by_agent_id(db_session, agent_id) + assert activity_record is not None + assert activity_record.metadata_ is None + + +def test_list_activities_with_metadata_filter(db_session: Session): + """Test filtering activities by metadata.""" + # Create activities for different organizations + activity.create( + task_name="task_org_a", + message="Org A task", + session=db_session, + metadata={"organization_id": "org-A"}, + ) + activity.create( + task_name="task_org_a_2", + message="Org A task 2", + session=db_session, + metadata={"organization_id": "org-A"}, + ) + activity.create( + task_name="task_org_b", + message="Org B task", + session=db_session, + metadata={"organization_id": "org-B"}, + ) + + # Filter by org-A + result = activity.list( + db_session, + metadata_filter={"organization_id": "org-A"}, + ) + assert result.total == 2 + assert len(result.items) == 2 + for item in result.items: + assert item.metadata["organization_id"] == "org-A" + + # Filter by org-B + result = activity.list( + db_session, + metadata_filter={"organization_id": "org-B"}, + ) + assert result.total == 1 + assert result.items[0].metadata["organization_id"] == "org-B" + + # Filter by non-existent org + result = activity.list( + db_session, + metadata_filter={"organization_id": "org-C"}, + ) + assert result.total == 0 + + +def test_list_activities_with_multiple_metadata_filters(db_session: Session): + """Test filtering activities by multiple metadata fields.""" + activity.create( + task_name="task_1", + message="User 1 in Org A", + session=db_session, + metadata={"organization_id": "org-A", "user_id": "user-1"}, + ) + activity.create( + task_name="task_2", + message="User 2 in Org A", + session=db_session, + metadata={"organization_id": "org-A", "user_id": "user-2"}, + ) + + # Filter by both org and user + result = activity.list( + db_session, + metadata_filter={"organization_id": "org-A", "user_id": "user-1"}, + ) + assert result.total == 1 + + +def test_detail_activity_with_metadata(db_session: Session): + """Test getting activity detail includes metadata.""" + agent_id = activity.create( + task_name="detailed_metadata_task", + message="Test", + session=db_session, + metadata={"organization_id": "org-123"}, + ) + + result = activity.detail(db_session, agent_id) + assert result is not None + assert result.metadata == {"organization_id": "org-123"} + + +def test_detail_activity_with_metadata_filter_match(db_session: Session): + """Test detail returns activity when metadata filter matches.""" + agent_id = activity.create( + task_name="filter_match_task", + message="Test", + session=db_session, + metadata={"organization_id": "org-A"}, + ) + + result = activity.detail( + db_session, + agent_id, + metadata_filter={"organization_id": "org-A"}, + ) + assert result is not None + assert result.agent_id == agent_id + + +def test_detail_activity_with_metadata_filter_no_match(db_session: Session): + """Test detail returns None when metadata filter doesn't match.""" + agent_id = activity.create( + task_name="filter_no_match_task", + message="Test", + session=db_session, + metadata={"organization_id": "org-A"}, + ) + + # Try to access with wrong organization + result = activity.detail( + db_session, + agent_id, + metadata_filter={"organization_id": "org-B"}, + ) + assert result is None + + +def test_detail_activity_no_metadata_with_filter(db_session: Session): + """Test detail returns None when activity has no metadata but filter is applied.""" + agent_id = activity.create( + task_name="no_metadata_with_filter", + message="Test", + session=db_session, + ) + + result = activity.detail( + db_session, + agent_id, + metadata_filter={"organization_id": "org-A"}, + ) + assert result is None + + +def test_list_metadata_accessible_as_attribute(db_session: Session): + """Test that metadata is accessible as an attribute on schema objects.""" + activity.create( + task_name="list_metadata_task", + message="Test", + session=db_session, + metadata={"key1": "value1", "key2": "value2"}, + ) + + result = activity.list(db_session) + assert result.total == 1 + # Metadata is accessible as attribute for programmatic use + assert result.items[0].metadata == {"key1": "value1", "key2": "value2"} + + +def test_metadata_excluded_from_serialization(db_session: Session): + """Test that metadata is excluded from JSON/dict serialization by default. + + This prevents accidental leakage of tenant info through API responses. + Users who want metadata in responses should explicitly include it. + """ + agent_id = activity.create( + task_name="serialization_test", + message="Test", + session=db_session, + metadata={"organization_id": "org-123", "secret": "sensitive"}, + ) + + # List view - metadata excluded from serialization + result = activity.list(db_session) + item_dict = result.items[0].model_dump() + assert "metadata" not in item_dict + + # Detail view - metadata excluded from serialization + detail = activity.detail(db_session, agent_id) + detail_dict = detail.model_dump() + assert "metadata" not in detail_dict + + # But still accessible as attribute for internal use + assert result.items[0].metadata == {"organization_id": "org-123", "secret": "sensitive"} + assert detail.metadata == {"organization_id": "org-123", "secret": "sensitive"}